penumbra_dex/component/circuit_breaker/
value.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
use anyhow::{anyhow, Result};
use cnidarium::{StateRead, StateWrite};
use penumbra_asset::{asset, Value};
use penumbra_num::Amount;
use penumbra_proto::{DomainType, StateReadProto, StateWriteProto};
use tonic::async_trait;
use tracing::instrument;

use crate::{event, state_key};

#[async_trait]
pub trait ValueCircuitBreakerRead: StateRead {
    /// Fetch the DEX VCB balance for a specified asset id.
    async fn get_dex_vcb_for_asset(&self, id: &asset::Id) -> Result<Option<Amount>> {
        Ok(self.get(&state_key::value_balance(&id)).await?)
    }
}

impl<T: StateRead + ?Sized> ValueCircuitBreakerRead for T {}

/// Tracks the aggregate value of deposits in the DEX.
#[async_trait]
pub(crate) trait ValueCircuitBreaker: StateWrite {
    /// Credits the supplied [`Value`] to the dex VCB.
    #[instrument(skip(self))]
    async fn dex_vcb_credit(&mut self, value: Value) -> Result<()> {
        if value.amount == Amount::zero() {
            return Ok(());
        }

        let prev_balance: Amount = self
            .get_dex_vcb_for_asset(&value.asset_id)
            .await?
            .unwrap_or_default();
        let new_balance = prev_balance
            .checked_add(&value.amount)
            .ok_or_else(|| anyhow!("overflowed balance while crediting value circuit breaker (prev balance={prev_balance:?}, credit={value:?}"))?;

        tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
        self.put(state_key::value_balance(&value.asset_id), new_balance);

        self.record_proto(
            event::EventValueCircuitBreakerCredit {
                asset_id: value.asset_id,
                previous_balance: prev_balance,
                new_balance,
            }
            .to_proto(),
        );
        Ok(())
    }

    /// Debits the specified [`Value`] from the dex VCB.
    #[instrument(skip(self))]
    async fn dex_vcb_debit(&mut self, value: Value) -> Result<()> {
        if value.amount == Amount::zero() {
            return Ok(());
        }

        let prev_balance: Amount = self
            .get_dex_vcb_for_asset(&value.asset_id)
            .await?
            .unwrap_or_default();
        let new_balance = prev_balance
            .checked_sub(&value.amount)
            .ok_or_else(|| anyhow!("underflowed balance while debiting value circuit breaker (prev balance={prev_balance:?}, debit={value:?}"))?;

        tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
        self.put(state_key::value_balance(&value.asset_id), new_balance);

        self.record_proto(
            event::EventValueCircuitBreakerDebit {
                asset_id: value.asset_id,
                previous_balance: prev_balance,
                new_balance,
            }
            .to_proto(),
        );
        Ok(())
    }
}

impl<T: StateWrite + ?Sized> ValueCircuitBreaker for T {}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use crate::component::position_manager::price_index::PositionByPriceIndex;
    use crate::component::router::HandleBatchSwaps as _;
    use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite};
    use crate::lp::plan::PositionWithdrawPlan;
    use crate::{
        component::{router::create_buy, tests::TempStorageExt},
        state_key, DirectedUnitPair,
    };
    use crate::{BatchSwapOutputData, PositionOpen};
    use cnidarium::{ArcStateDeltaExt as _, StateDelta, TempStorage};
    use cnidarium_component::ActionHandler as _;
    use penumbra_asset::asset;
    use penumbra_num::Amount;
    use penumbra_proto::StateWriteProto as _;
    use penumbra_sct::component::clock::EpochManager as _;
    use penumbra_sct::component::source::SourceContext as _;
    use penumbra_sct::epoch::Epoch;

    use super::*;

    #[tokio::test]
    async fn value_circuit_breaker() -> anyhow::Result<()> {
        let _ = tracing_subscriber::fmt::try_init();
        let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
        let mut state_tx = state.try_begin_transaction().unwrap();

        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
        let test_usd = asset::Cache::with_known_assets()
            .get_unit("test_usd")
            .unwrap();

        // A credit followed by a debit of the same amount should succeed.
        // Credit 100 gm.
        state_tx.dex_vcb_credit(gm.value(100u64.into())).await?;
        // Credit 100 gn.
        state_tx.dex_vcb_credit(gn.value(100u64.into())).await?;

        // Debit 100 gm.
        state_tx.dex_vcb_debit(gm.value(100u64.into())).await?;
        // Debit 100 gn.
        state_tx.dex_vcb_debit(gn.value(100u64.into())).await?;

        // Debiting an additional gm should fail.
        assert!(state_tx.dex_vcb_debit(gm.value(1u64.into())).await.is_err());

        // Debiting an asset that hasn't been credited should also fail.
        assert!(state_tx
            .dex_vcb_debit(test_usd.value(1u64.into()))
            .await
            .is_err());

        Ok(())
    }

    #[tokio::test]
    async fn position_value_circuit_breaker() -> anyhow::Result<()> {
        let _ = tracing_subscriber::fmt::try_init();
        let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
        let mut state_tx = state.try_begin_transaction().unwrap();

        let height = 1;

        // 1. Simulate BeginBlock

        state_tx.put_epoch_by_height(
            height,
            Epoch {
                index: 0,
                start_height: 0,
            },
        );
        state_tx.put_block_height(height);
        state_tx.apply();

        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();

        let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());

        let one = 1u64.into();
        let price1 = one;
        // Create a position buying 1 gm with 1 gn (i.e. reserves will be 1gn).
        let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);

        // Create the PositionOpen action
        let pos_open = PositionOpen {
            position: buy_1.clone(),
        };

        // Execute the PositionOpen action.
        pos_open.check_stateless(()).await?;
        pos_open.check_historical(state.clone()).await?;
        let mut state_tx = state.try_begin_transaction().unwrap();
        state_tx.put_mock_source(1u8);
        pos_open.check_and_execute(&mut state_tx).await?;
        state_tx.apply();

        // Set the output data for the block to 1 gn and 0 gm.
        // This should not error, the circuit breaker should not trip.
        let mut state_tx = state.try_begin_transaction().unwrap();
        state_tx
            .set_output_data(
                BatchSwapOutputData {
                    delta_1: 0u64.into(),
                    delta_2: 1u64.into(),
                    lambda_1: 0u64.into(),
                    lambda_2: 0u64.into(),
                    unfilled_1: 0u64.into(),
                    unfilled_2: 0u64.into(),
                    height: 1,
                    trading_pair: pair_1.into_directed_trading_pair().into(),
                    sct_position_prefix: Default::default(),
                },
                None,
                None,
            )
            .await?;

        // Pretend the position was overfilled.

        // Wipe out the gm value in the circuit breaker, so that any outflows should trip it.
        state_tx.put(state_key::value_balance(&gm.id()), Amount::from(0u64));

        // Create the PositionWithdraw action
        let pos_withdraw_plan = PositionWithdrawPlan {
            position_id: buy_1.id(),
            reserves: buy_1.reserves,
            sequence: 1,
            pair: pair_1.into_directed_trading_pair().into(),
            rewards: vec![],
        };

        let pos_withdraw = pos_withdraw_plan.position_withdraw();

        // Execute the PositionWithdraw action.
        pos_withdraw.check_stateless(()).await?;
        pos_withdraw.check_historical(state.clone()).await?;
        let mut state_tx = state.try_begin_transaction().unwrap();
        state_tx.put_mock_source(1u8);
        // This should error, since there is no balance available to withdraw the position.
        assert!(pos_withdraw.check_and_execute(&mut state_tx).await.is_err());
        state_tx.apply();

        Ok(())
    }

    #[tokio::test]
    #[should_panic(expected = "underflowed balance while debiting value circuit breaker")]
    async fn batch_swap_circuit_breaker() {
        let _ = tracing_subscriber::fmt::try_init();
        let storage = TempStorage::new()
            .await
            .expect("able to create storage")
            .apply_minimal_genesis()
            .await
            .expect("able to apply genesis");
        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
        let mut state_tx = state.try_begin_transaction().unwrap();

        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();

        let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());

        // Manually put a position without calling `put_position` so that the
        // circuit breaker is not aware of the position's value. Then, handling a batch
        // swap that fills against this position should result in an error.
        let one = 1u64.into();
        let price1 = one;
        // Create a position buying 1 gm with 1 gn (i.e. reserves will be 1gn).
        let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);

        let id = buy_1.id();

        let position = buy_1;
        state_tx
            .update_position_by_price_index(&position.id(), &None, &position)
            .expect("can update price index");
        state_tx.put(state_key::position_by_id(&id), position);

        // Now there's a position in the state, but the circuit breaker is not aware of it.
        let trading_pair = pair_1.into_directed_trading_pair().into();
        let mut swap_flow = state_tx.swap_flow(&trading_pair);

        assert!(trading_pair.asset_1() == gm.id());

        // Add the amount of each asset being swapped to the batch swap flow.
        swap_flow.0 += gm.value(5u32.into()).amount;
        swap_flow.1 += 0u32.into();

        // Set the batch swap flow for the trading pair.
        state_tx
            .accumulate_swap_flow(&trading_pair, swap_flow.clone())
            .await
            .unwrap();
        state_tx.apply();

        let routing_params = state.routing_params().await.unwrap();
        let max_execution = state.get_dex_params().await.unwrap().max_execution_budget;
        // This call should panic due to the outflow of gn not being covered by the circuit breaker.
        state
            .handle_batch_swaps(trading_pair, swap_flow, 0, routing_params, max_execution)
            .await
            .expect("unable to process batch swaps");
    }
}