penumbra_sdk_dex/component/circuit_breaker/
value.rs

1use anyhow::{anyhow, Result};
2use cnidarium::{StateRead, StateWrite};
3use penumbra_sdk_asset::{asset, Value};
4use penumbra_sdk_num::Amount;
5use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
6use tonic::async_trait;
7use tracing::instrument;
8
9use crate::{event, state_key};
10
11#[async_trait]
12pub trait ValueCircuitBreakerRead: StateRead {
13    /// Fetch the DEX VCB balance for a specified asset id.
14    async fn get_dex_vcb_for_asset(&self, id: &asset::Id) -> Result<Option<Amount>> {
15        Ok(self.get(&state_key::value_balance(&id)).await?)
16    }
17}
18
19impl<T: StateRead + ?Sized> ValueCircuitBreakerRead for T {}
20
21/// Tracks the aggregate value of deposits in the DEX.
22#[async_trait]
23pub(crate) trait ValueCircuitBreaker: StateWrite {
24    /// Credits the supplied [`Value`] to the dex VCB.
25    #[instrument(skip(self))]
26    async fn dex_vcb_credit(&mut self, value: Value) -> Result<()> {
27        if value.amount == Amount::zero() {
28            return Ok(());
29        }
30
31        let prev_balance: Amount = self
32            .get_dex_vcb_for_asset(&value.asset_id)
33            .await?
34            .unwrap_or_default();
35        let new_balance = prev_balance
36            .checked_add(&value.amount)
37            .ok_or_else(|| anyhow!("overflowed balance while crediting value circuit breaker (prev balance={prev_balance:?}, credit={value:?}"))?;
38
39        tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
40        self.put(state_key::value_balance(&value.asset_id), new_balance);
41
42        self.record_proto(
43            event::EventValueCircuitBreakerCredit {
44                asset_id: value.asset_id,
45                previous_balance: prev_balance,
46                new_balance,
47            }
48            .to_proto(),
49        );
50        Ok(())
51    }
52
53    /// Debits the specified [`Value`] from the dex VCB.
54    #[instrument(skip(self))]
55    async fn dex_vcb_debit(&mut self, value: Value) -> Result<()> {
56        if value.amount == Amount::zero() {
57            return Ok(());
58        }
59
60        let prev_balance: Amount = self
61            .get_dex_vcb_for_asset(&value.asset_id)
62            .await?
63            .unwrap_or_default();
64        let new_balance = prev_balance
65            .checked_sub(&value.amount)
66            .ok_or_else(|| anyhow!("underflowed balance while debiting value circuit breaker (prev balance={prev_balance:?}, debit={value:?}"))?;
67
68        tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
69        self.put(state_key::value_balance(&value.asset_id), new_balance);
70
71        self.record_proto(
72            event::EventValueCircuitBreakerDebit {
73                asset_id: value.asset_id,
74                previous_balance: prev_balance,
75                new_balance,
76            }
77            .to_proto(),
78        );
79        Ok(())
80    }
81}
82
83impl<T: StateWrite + ?Sized> ValueCircuitBreaker for T {}
84
85#[cfg(test)]
86mod tests {
87    use std::sync::Arc;
88
89    use crate::component::position_manager::price_index::PositionByPriceIndex;
90    use crate::component::router::HandleBatchSwaps as _;
91    use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite};
92    use crate::lp::plan::PositionWithdrawPlan;
93    use crate::{
94        component::{router::create_buy, tests::TempStorageExt},
95        state_key, DirectedUnitPair,
96    };
97    use crate::{BatchSwapOutputData, PositionOpen};
98    use cnidarium::{ArcStateDeltaExt as _, StateDelta, TempStorage};
99    use cnidarium_component::ActionHandler as _;
100    use penumbra_sdk_asset::asset;
101    use penumbra_sdk_num::Amount;
102    use penumbra_sdk_proto::StateWriteProto as _;
103    use penumbra_sdk_sct::component::clock::EpochManager as _;
104    use penumbra_sdk_sct::component::source::SourceContext as _;
105    use penumbra_sdk_sct::epoch::Epoch;
106
107    use super::*;
108
109    #[tokio::test]
110    async fn value_circuit_breaker() -> anyhow::Result<()> {
111        let _ = tracing_subscriber::fmt::try_init();
112        let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
113        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
114        let mut state_tx = state.try_begin_transaction().unwrap();
115
116        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
117        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
118        let test_usd = asset::Cache::with_known_assets()
119            .get_unit("test_usd")
120            .unwrap();
121
122        // A credit followed by a debit of the same amount should succeed.
123        // Credit 100 gm.
124        state_tx.dex_vcb_credit(gm.value(100u64.into())).await?;
125        // Credit 100 gn.
126        state_tx.dex_vcb_credit(gn.value(100u64.into())).await?;
127
128        // Debit 100 gm.
129        state_tx.dex_vcb_debit(gm.value(100u64.into())).await?;
130        // Debit 100 gn.
131        state_tx.dex_vcb_debit(gn.value(100u64.into())).await?;
132
133        // Debiting an additional gm should fail.
134        assert!(state_tx.dex_vcb_debit(gm.value(1u64.into())).await.is_err());
135
136        // Debiting an asset that hasn't been credited should also fail.
137        assert!(state_tx
138            .dex_vcb_debit(test_usd.value(1u64.into()))
139            .await
140            .is_err());
141
142        Ok(())
143    }
144
145    #[tokio::test]
146    async fn position_value_circuit_breaker() -> anyhow::Result<()> {
147        let _ = tracing_subscriber::fmt::try_init();
148        let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
149        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
150        let mut state_tx = state.try_begin_transaction().unwrap();
151
152        let height = 1;
153
154        // 1. Simulate BeginBlock
155
156        state_tx.put_epoch_by_height(
157            height,
158            Epoch {
159                index: 0,
160                start_height: 0,
161            },
162        );
163        state_tx.put_block_height(height);
164        state_tx.apply();
165
166        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
167        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
168
169        let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());
170
171        let one = 1u64.into();
172        let price1 = one;
173        // Create a position buying 1 gm with 1 gn (i.e. reserves will be 1gn).
174        let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);
175
176        // Create the PositionOpen action
177        let pos_open = PositionOpen {
178            position: buy_1.clone(),
179        };
180
181        // Execute the PositionOpen action.
182        pos_open.check_stateless(()).await?;
183        pos_open.check_historical(state.clone()).await?;
184        let mut state_tx = state.try_begin_transaction().unwrap();
185        state_tx.put_mock_source(1u8);
186        pos_open.check_and_execute(&mut state_tx).await?;
187        state_tx.apply();
188
189        // Set the output data for the block to 1 gn and 0 gm.
190        // This should not error, the circuit breaker should not trip.
191        let mut state_tx = state.try_begin_transaction().unwrap();
192        state_tx
193            .set_output_data(
194                BatchSwapOutputData {
195                    delta_1: 0u64.into(),
196                    delta_2: 1u64.into(),
197                    lambda_1: 0u64.into(),
198                    lambda_2: 0u64.into(),
199                    unfilled_1: 0u64.into(),
200                    unfilled_2: 0u64.into(),
201                    height: 1,
202                    trading_pair: pair_1.into_directed_trading_pair().into(),
203                    sct_position_prefix: Default::default(),
204                },
205                None,
206                None,
207            )
208            .await?;
209
210        // Pretend the position was overfilled.
211
212        // Wipe out the gm value in the circuit breaker, so that any outflows should trip it.
213        state_tx.put(state_key::value_balance(&gm.id()), Amount::from(0u64));
214
215        // Create the PositionWithdraw action
216        let pos_withdraw_plan = PositionWithdrawPlan {
217            position_id: buy_1.id(),
218            reserves: buy_1.reserves,
219            sequence: 1,
220            pair: pair_1.into_directed_trading_pair().into(),
221            rewards: vec![],
222        };
223
224        let pos_withdraw = pos_withdraw_plan.position_withdraw();
225
226        // Execute the PositionWithdraw action.
227        pos_withdraw.check_stateless(()).await?;
228        pos_withdraw.check_historical(state.clone()).await?;
229        let mut state_tx = state.try_begin_transaction().unwrap();
230        state_tx.put_mock_source(1u8);
231        // This should error, since there is no balance available to withdraw the position.
232        assert!(pos_withdraw.check_and_execute(&mut state_tx).await.is_err());
233        state_tx.apply();
234
235        Ok(())
236    }
237
238    #[tokio::test]
239    #[should_panic(expected = "underflowed balance while debiting value circuit breaker")]
240    async fn batch_swap_circuit_breaker() {
241        let _ = tracing_subscriber::fmt::try_init();
242        let storage = TempStorage::new()
243            .await
244            .expect("able to create storage")
245            .apply_minimal_genesis()
246            .await
247            .expect("able to apply genesis");
248        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
249        let mut state_tx = state.try_begin_transaction().unwrap();
250
251        let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
252        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
253
254        let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());
255
256        // Manually put a position without calling `put_position` so that the
257        // circuit breaker is not aware of the position's value. Then, handling a batch
258        // swap that fills against this position should result in an error.
259        let one = 1u64.into();
260        let price1 = one;
261        // Create a position buying 1 gm with 1 gn (i.e. reserves will be 1gn).
262        let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);
263
264        let id = buy_1.id();
265
266        let position = buy_1;
267        state_tx
268            .update_position_by_price_index(&position.id(), &None, &position)
269            .expect("can update price index");
270        state_tx.put(state_key::position_by_id(&id), position);
271
272        // Now there's a position in the state, but the circuit breaker is not aware of it.
273        let trading_pair = pair_1.into_directed_trading_pair().into();
274        let mut swap_flow = state_tx.swap_flow(&trading_pair);
275
276        assert!(trading_pair.asset_1() == gm.id());
277
278        // Add the amount of each asset being swapped to the batch swap flow.
279        swap_flow.0 += gm.value(5u32.into()).amount;
280        swap_flow.1 += 0u32.into();
281
282        // Set the batch swap flow for the trading pair.
283        state_tx
284            .accumulate_swap_flow(&trading_pair, swap_flow.clone())
285            .await
286            .unwrap();
287        state_tx.apply();
288
289        let routing_params = state.routing_params().await.unwrap();
290        let max_execution = state.get_dex_params().await.unwrap().max_execution_budget;
291        // This call should panic due to the outflow of gn not being covered by the circuit breaker.
292        state
293            .handle_batch_swaps(trading_pair, swap_flow, 0, routing_params, max_execution)
294            .await
295            .expect("unable to process batch swaps");
296    }
297}