penumbra_dex/component/
dex.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use cnidarium::{StateRead, StateWrite};
use cnidarium_component::Component;
use penumbra_asset::asset;
use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID};
use penumbra_fee::component::StateWriteExt as _;
use penumbra_fee::Fee;
use penumbra_num::Amount;
use penumbra_proto::{DomainType as _, StateReadProto, StateWriteProto};
use tendermint::v0_37::abci;
use tracing::instrument;

use crate::state_key::block_scoped;
use crate::{
    component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key,
    BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair,
};

use super::eviction_manager::EvictionManager;
use super::{
    chandelier::Chandelier,
    router::{HandleBatchSwaps, RoutingParams},
    Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker,
};

pub struct Dex {}

#[async_trait]
impl Component for Dex {
    type AppState = genesis::Content;

    #[instrument(name = "dex", skip(state, app_state))]
    async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
        match app_state {
            None => { /* no-op */ }
            Some(app_state) => {
                state.put_dex_params(app_state.dex_params.clone());
            }
        }
    }

    #[instrument(name = "dex", skip(_state, _begin_block))]
    async fn begin_block<S: StateWrite + 'static>(
        _state: &mut Arc<S>,
        _begin_block: &abci::request::BeginBlock,
    ) {
    }

    #[instrument(name = "dex", skip(state, end_block))]
    async fn end_block<S: StateWrite + 'static>(
        state: &mut Arc<S>,
        end_block: &abci::request::EndBlock,
    ) {
        // F.0. Add all non-native fee payments as swap flows.
        let base_fees_and_tips = {
            let state_ref =
                Arc::get_mut(state).expect("should have unique ref at start of Dex::end_block");

            // Extract the accumulated base fees and tips from the fee component, leaving 0 in its place.
            let base_fees_and_tips = state_ref.take_accumulated_base_fees_and_tips();

            // For each nonnative fee asset, add it in as if it were a chain-submitted swap.
            for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
                if *asset_id == *STAKING_TOKEN_ASSET_ID {
                    continue;
                }
                let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
                // We want to swap all of the fees into the native token, the base/tip distinction
                // just affects where the resulting fees go.
                let total = *base_fee + *tip;
                // DANGEROUS: need to be careful about which side of the pair is which,
                // but the existing API is unsafe and fixing it would be a much larger refactor.
                let flow = if pair.asset_1() == *asset_id {
                    (total, Amount::zero())
                } else {
                    (Amount::zero(), total)
                };
                tracing::debug!(
                    ?asset_id,
                    ?base_fee,
                    ?tip,
                    ?total,
                    ?flow,
                    "inserting chain-submitted swap for alt fee token"
                );

                // Accumulate into the swap flows for this block.
                state_ref
                    .accumulate_swap_flow(&pair, flow.into())
                    .await
                    .expect("should be able to credit DEX VCB");
            }

            // Hold on to the list of base fees and tips so we can claim outputs correctly.
            base_fees_and_tips
        };

        // 1. Add all newly opened positions to the DEX.
        // This has already happened in the action handlers for each `PositionOpen` action.

        // 2. For each batch swap during the block, calculate clearing prices and set in the JMT.
        let routing_params = state.routing_params().await.expect("dex params are set");
        let execution_budget = state
            .get_dex_params()
            .await
            .expect("dex params are set")
            .max_execution_budget;

        // Local cache of BSODs used for claiming fee swaps.
        let mut bsods = BTreeMap::new();

        for (trading_pair, swap_flows) in state.swap_flows() {
            let batch_start = std::time::Instant::now();
            let bsod = state
                .handle_batch_swaps(
                    trading_pair,
                    swap_flows,
                    end_block
                        .height
                        .try_into()
                        .expect("height is part of the end block data"),
                    // Always include both ends of the target pair as fixed candidates.
                    routing_params
                        .clone()
                        .with_extra_candidates([trading_pair.asset_1(), trading_pair.asset_2()]),
                    execution_budget,
                )
                .await
                .expect("handling batch swaps is infaillible");
            metrics::histogram!(crate::component::metrics::DEX_BATCH_DURATION)
                .record(batch_start.elapsed());

            bsods.insert(trading_pair, bsod);
        }

        // F.1. Having performed all batch swaps, "claim" the base fees and tips.
        // The VCB has already been debited through the BSOD.
        {
            let state_ref =
                Arc::get_mut(state).expect("should have unique ref after finishing batch swaps");
            for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
                if *asset_id == *STAKING_TOKEN_ASSET_ID {
                    // In this case, there was nothing to swap, so there's nothing
                    // to claim and we just accumulate the fee we took back into the fee component.
                    state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(*base_fee));
                    state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(*tip));
                    continue;
                }
                let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
                let bsod = bsods
                    .get(&pair)
                    .expect("bsod should be present for chain-submitted swap");

                let (base_input, tip_input) = if pair.asset_1() == *asset_id {
                    ((*base_fee, 0u64.into()), (*tip, 0u64.into()))
                } else {
                    ((0u64.into(), *base_fee), (0u64.into(), *tip))
                };

                let base_output = bsod.pro_rata_outputs(base_input);
                let tip_output = bsod.pro_rata_outputs(tip_input);
                tracing::debug!(
                    ?asset_id,
                    ?base_input,
                    ?tip_input,
                    ?base_output,
                    ?tip_output,
                    "claiming chain-submitted swap for alt fee token"
                );

                // Obtain the base fee and tip amounts in the native token, discarding any unfilled amounts.
                let (swapped_base, swapped_tip) = if pair.asset_1() == *asset_id {
                    // If `asset_id` is `R_1` we want to pull the other leg of the pair.
                    (base_output.1, tip_output.1)
                } else {
                    // and vice-versa. `R_1` contains native tokens.
                    (base_output.0, tip_output.0)
                };

                // Finally, accumulate the swapped base fee and tip back into the fee component.
                // (We already took all the fees out).
                state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(swapped_base));
                state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(swapped_tip));
            }
        }

        // 3. Perform arbitrage to ensure all prices are consistent post-execution:

        // For arbitrage, we extend the path search by 2 hops to allow a path out of the
        // staking token and back.

        // Extend the fixed candidate set to include recently accessed assets, to have
        // more arbitrage execution against newly opened positions.
        let fixed_candidates = Arc::new(
            routing_params
                .fixed_candidates
                .iter()
                .cloned()
                // The set of recently accessed assets is already limited to avoid
                // potentially blowing up routing time.
                .chain(state.recently_accessed_assets().iter().cloned())
                .collect::<Vec<_>>(),
        );

        let arb_routing_params = RoutingParams {
            max_hops: routing_params.max_hops + 2,
            fixed_candidates,
            price_limit: Some(1u64.into()),
        };

        match state
            .arbitrage(*STAKING_TOKEN_ASSET_ID, arb_routing_params)
            .await
        {
            // The arb search completed successfully, and surfaced some surplus.
            Ok(Some(v)) => tracing::info!(surplus = ?v, "arbitrage successful!"),
            // The arb completed without errors, but resulted in no surplus, so
            // the state fork was discarded.
            Ok(None) => tracing::debug!("no arbitrage found"),
            // The arbitrage search should not error, but if it does, we should
            // simply not perform arbitrage, rather than halting the entire chain.
            Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"),
        }

        // 4. Inspect trading pairs that saw new position opened during this block, and
        // evict their excess LPs if any are found.
        let _ = Arc::get_mut(state)
            .expect("state should be uniquely referenced after batch swaps complete")
            .evict_positions()
            .await
            .map_err(|e| tracing::error!(?e, "error evicting positions, skipping"));

        // 5. Close all positions queued for closure at the end of the block.
        // It's important to do this after execution, to allow block-scoped JIT liquidity.
        Arc::get_mut(state)
            .expect("state should be uniquely referenced after batch swaps complete")
            .close_queued_positions()
            .await
            .expect("closing queued positions should not fail");

        // 5. Finalize the candlestick data for the block.
        Arc::get_mut(state)
            .expect("state should be uniquely referenced after batch swaps complete")
            .finalize_block_candlesticks()
            .await
            .expect("finalizing block candlesticks should not fail");
    }

    #[instrument(name = "dex", skip(_state))]
    async fn end_epoch<S: StateWrite + 'static>(mut _state: &mut Arc<S>) -> Result<()> {
        Ok(())
    }
}

/// Provides public read access to DEX data.
#[async_trait]
pub trait StateReadExt: StateRead {
    /// Gets the DEX parameters from the state.
    async fn get_dex_params(&self) -> Result<DexParameters> {
        self.get(state_key::config::dex_params())
            .await?
            .ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
    }

    /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
    async fn routing_params(&self) -> Result<RoutingParams> {
        self.get_dex_params().await.map(RoutingParams::from)
    }

    async fn output_data(
        &self,
        height: u64,
        trading_pair: TradingPair,
    ) -> Result<Option<BatchSwapOutputData>> {
        self.get(&state_key::output_data(height, trading_pair))
            .await
    }

    async fn swap_execution(
        &self,
        height: u64,
        trading_pair: DirectedTradingPair,
    ) -> Result<Option<SwapExecution>> {
        self.nonverifiable_get(state_key::swap_execution(height, trading_pair).as_bytes())
            .await
    }

    async fn arb_execution(&self, height: u64) -> Result<Option<SwapExecution>> {
        self.get(&state_key::arb_execution(height)).await
    }

    /// Return a set of [`TradingPair`]s for which liquidity positions were opened
    /// during this block.
    fn get_active_trading_pairs_in_block(&self) -> BTreeSet<TradingPair> {
        self.object_get(block_scoped::active::trading_pairs())
            .unwrap_or_default()
    }
}

impl<T: StateRead + ?Sized> StateReadExt for T {}

/// Extension trait providing write access to dex data.
#[async_trait]
pub trait StateWriteExt: StateWrite {
    fn put_dex_params(&mut self, params: DexParameters) {
        self.put(state_key::config::dex_params().to_string(), params);
    }
}

impl<T: StateWrite + ?Sized> StateWriteExt for T {}

/// The maximum number of "hot" asset identifiers to track for this block.
const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;

/// Provide write access to internal dex data.
pub(crate) trait InternalDexWrite: StateWrite {
    /// Adds an asset ID to the list of recently accessed assets,
    /// making it a candidate for the current block's arbitrage routing.
    ///
    /// This ensures that assets associated with recently active positions
    /// will be eligible for arbitrage if mispriced positions are opened.
    #[tracing::instrument(level = "debug", skip_all)]
    fn add_recently_accessed_asset(
        &mut self,
        asset_id: asset::Id,
        fixed_candidates: Arc<Vec<asset::Id>>,
    ) {
        let mut assets = self.recently_accessed_assets();

        // Limit the number of recently accessed assets to prevent blowing
        // up routing time.
        if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
            return;
        }

        // If the asset is already in the fixed candidate list, don't insert it.
        if fixed_candidates.contains(&asset_id) {
            return;
        }

        assets.insert(asset_id);
        self.object_put(state_key::recently_accessed_assets(), assets);
    }

    /// Mark a [`TradingPair`] as active during this block.
    fn mark_trading_pair_as_active(&mut self, pair: TradingPair) {
        let mut active_pairs = self.get_active_trading_pairs_in_block();

        if active_pairs.insert(pair) {
            self.object_put(block_scoped::active::trading_pairs(), active_pairs)
        }
    }

    async fn set_output_data(
        &mut self,
        output_data: BatchSwapOutputData,
        swap_execution_1_for_2: Option<SwapExecution>,
        swap_execution_2_for_1: Option<SwapExecution>,
    ) -> Result<()> {
        // Debit the DEX for the swap outflows.
        // Note that since we credited the DEX for _all_ inflows, we need to debit the
        // unfilled amounts as well as the filled amounts.
        //
        // In the case of a value inflation bug, the debit call will return an underflow
        // error, which will halt the chain.
        self.dex_vcb_debit(Value {
            amount: output_data.unfilled_1 + output_data.lambda_1,
            asset_id: output_data.trading_pair.asset_1,
        })
        .await?;
        self.dex_vcb_debit(Value {
            amount: output_data.unfilled_2 + output_data.lambda_2,
            asset_id: output_data.trading_pair.asset_2,
        })
        .await?;

        // Write the output data to the state under a known key, for querying, ...
        let height = output_data.height;
        let trading_pair = output_data.trading_pair;
        self.put(state_key::output_data(height, trading_pair), output_data);

        // Store the swap executions for both directions in the state as well.
        if let Some(swap_execution) = swap_execution_1_for_2.clone() {
            let tp_1_for_2 = DirectedTradingPair::new(trading_pair.asset_1, trading_pair.asset_2);
            self.put_swap_execution_at_height(height, tp_1_for_2, swap_execution);
        }
        if let Some(swap_execution) = swap_execution_2_for_1.clone() {
            let tp_2_for_1 = DirectedTradingPair::new(trading_pair.asset_2, trading_pair.asset_1);
            self.put_swap_execution_at_height(height, tp_2_for_1, swap_execution);
        }

        // ... and also add it to the set in the compact block to be pushed out to clients.
        let mut outputs = self.pending_batch_swap_outputs();
        outputs.insert(trading_pair, output_data);
        self.object_put(state_key::pending_outputs(), outputs);

        // Also generate an ABCI event for indexing:
        self.record_proto(
            event::EventBatchSwap {
                batch_swap_output_data: output_data,
                swap_execution_1_for_2,
                swap_execution_2_for_1,
            }
            .to_proto(),
        );

        Ok(())
    }

    fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
        self.put(state_key::arb_execution(height), execution);
    }
}

impl<T: StateWrite + ?Sized> InternalDexWrite for T {}