penumbra_sdk_dex/component/circuit_breaker/
value.rs1use anyhow::{anyhow, Result};
2use cnidarium::{StateRead, StateWrite};
3use penumbra_sdk_asset::{asset, Value};
4use penumbra_sdk_num::Amount;
5use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
6use tonic::async_trait;
7use tracing::instrument;
8
9use crate::{event, state_key};
10
11#[async_trait]
12pub trait ValueCircuitBreakerRead: StateRead {
13 async fn get_dex_vcb_for_asset(&self, id: &asset::Id) -> Result<Option<Amount>> {
15 Ok(self.get(&state_key::value_balance(&id)).await?)
16 }
17}
18
19impl<T: StateRead + ?Sized> ValueCircuitBreakerRead for T {}
20
21#[async_trait]
23pub(crate) trait ValueCircuitBreaker: StateWrite {
24 #[instrument(skip(self))]
26 async fn dex_vcb_credit(&mut self, value: Value) -> Result<()> {
27 if value.amount == Amount::zero() {
28 return Ok(());
29 }
30
31 let prev_balance: Amount = self
32 .get_dex_vcb_for_asset(&value.asset_id)
33 .await?
34 .unwrap_or_default();
35 let new_balance = prev_balance
36 .checked_add(&value.amount)
37 .ok_or_else(|| anyhow!("overflowed balance while crediting value circuit breaker (prev balance={prev_balance:?}, credit={value:?}"))?;
38
39 tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
40 self.put(state_key::value_balance(&value.asset_id), new_balance);
41
42 self.record_proto(
43 event::EventValueCircuitBreakerCredit {
44 asset_id: value.asset_id,
45 previous_balance: prev_balance,
46 new_balance,
47 }
48 .to_proto(),
49 );
50 Ok(())
51 }
52
53 #[instrument(skip(self))]
55 async fn dex_vcb_debit(&mut self, value: Value) -> Result<()> {
56 if value.amount == Amount::zero() {
57 return Ok(());
58 }
59
60 let prev_balance: Amount = self
61 .get_dex_vcb_for_asset(&value.asset_id)
62 .await?
63 .unwrap_or_default();
64 let new_balance = prev_balance
65 .checked_sub(&value.amount)
66 .ok_or_else(|| anyhow!("underflowed balance while debiting value circuit breaker (prev balance={prev_balance:?}, debit={value:?}"))?;
67
68 tracing::debug!(?prev_balance, ?new_balance, "crediting the dex VCB");
69 self.put(state_key::value_balance(&value.asset_id), new_balance);
70
71 self.record_proto(
72 event::EventValueCircuitBreakerDebit {
73 asset_id: value.asset_id,
74 previous_balance: prev_balance,
75 new_balance,
76 }
77 .to_proto(),
78 );
79 Ok(())
80 }
81}
82
83impl<T: StateWrite + ?Sized> ValueCircuitBreaker for T {}
84
85#[cfg(test)]
86mod tests {
87 use std::sync::Arc;
88
89 use crate::component::position_manager::price_index::PositionByPriceIndex;
90 use crate::component::router::HandleBatchSwaps as _;
91 use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite};
92 use crate::lp::plan::PositionWithdrawPlan;
93 use crate::{
94 component::{router::create_buy, tests::TempStorageExt},
95 state_key, DirectedUnitPair,
96 };
97 use crate::{BatchSwapOutputData, PositionOpen};
98 use cnidarium::{ArcStateDeltaExt as _, StateDelta, TempStorage};
99 use cnidarium_component::ActionHandler as _;
100 use penumbra_sdk_asset::asset;
101 use penumbra_sdk_num::Amount;
102 use penumbra_sdk_proto::StateWriteProto as _;
103 use penumbra_sdk_sct::component::clock::EpochManager as _;
104 use penumbra_sdk_sct::component::source::SourceContext as _;
105 use penumbra_sdk_sct::epoch::Epoch;
106
107 use super::*;
108
109 #[tokio::test]
110 async fn value_circuit_breaker() -> anyhow::Result<()> {
111 let _ = tracing_subscriber::fmt::try_init();
112 let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
113 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
114 let mut state_tx = state.try_begin_transaction().unwrap();
115
116 let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
117 let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
118 let test_usd = asset::Cache::with_known_assets()
119 .get_unit("test_usd")
120 .unwrap();
121
122 state_tx.dex_vcb_credit(gm.value(100u64.into())).await?;
125 state_tx.dex_vcb_credit(gn.value(100u64.into())).await?;
127
128 state_tx.dex_vcb_debit(gm.value(100u64.into())).await?;
130 state_tx.dex_vcb_debit(gn.value(100u64.into())).await?;
132
133 assert!(state_tx.dex_vcb_debit(gm.value(1u64.into())).await.is_err());
135
136 assert!(state_tx
138 .dex_vcb_debit(test_usd.value(1u64.into()))
139 .await
140 .is_err());
141
142 Ok(())
143 }
144
145 #[tokio::test]
146 async fn position_value_circuit_breaker() -> anyhow::Result<()> {
147 let _ = tracing_subscriber::fmt::try_init();
148 let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
149 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
150 let mut state_tx = state.try_begin_transaction().unwrap();
151
152 let height = 1;
153
154 state_tx.put_epoch_by_height(
157 height,
158 Epoch {
159 index: 0,
160 start_height: 0,
161 },
162 );
163 state_tx.put_block_height(height);
164 state_tx.apply();
165
166 let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
167 let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
168
169 let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());
170
171 let one = 1u64.into();
172 let price1 = one;
173 let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);
175
176 let pos_open = PositionOpen {
178 position: buy_1.clone(),
179 };
180
181 pos_open.check_stateless(()).await?;
183 pos_open.check_historical(state.clone()).await?;
184 let mut state_tx = state.try_begin_transaction().unwrap();
185 state_tx.put_mock_source(1u8);
186 pos_open.check_and_execute(&mut state_tx).await?;
187 state_tx.apply();
188
189 let mut state_tx = state.try_begin_transaction().unwrap();
192 state_tx
193 .set_output_data(
194 BatchSwapOutputData {
195 delta_1: 0u64.into(),
196 delta_2: 1u64.into(),
197 lambda_1: 0u64.into(),
198 lambda_2: 0u64.into(),
199 unfilled_1: 0u64.into(),
200 unfilled_2: 0u64.into(),
201 height: 1,
202 trading_pair: pair_1.into_directed_trading_pair().into(),
203 sct_position_prefix: Default::default(),
204 },
205 None,
206 None,
207 )
208 .await?;
209
210 state_tx.put(state_key::value_balance(&gm.id()), Amount::from(0u64));
214
215 let pos_withdraw_plan = PositionWithdrawPlan {
217 position_id: buy_1.id(),
218 reserves: buy_1.reserves,
219 sequence: 1,
220 pair: pair_1.into_directed_trading_pair().into(),
221 rewards: vec![],
222 };
223
224 let pos_withdraw = pos_withdraw_plan.position_withdraw();
225
226 pos_withdraw.check_stateless(()).await?;
228 pos_withdraw.check_historical(state.clone()).await?;
229 let mut state_tx = state.try_begin_transaction().unwrap();
230 state_tx.put_mock_source(1u8);
231 assert!(pos_withdraw.check_and_execute(&mut state_tx).await.is_err());
233 state_tx.apply();
234
235 Ok(())
236 }
237
238 #[tokio::test]
239 #[should_panic(expected = "underflowed balance while debiting value circuit breaker")]
240 async fn batch_swap_circuit_breaker() {
241 let _ = tracing_subscriber::fmt::try_init();
242 let storage = TempStorage::new()
243 .await
244 .expect("able to create storage")
245 .apply_minimal_genesis()
246 .await
247 .expect("able to apply genesis");
248 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
249 let mut state_tx = state.try_begin_transaction().unwrap();
250
251 let gm = asset::Cache::with_known_assets().get_unit("gm").unwrap();
252 let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
253
254 let pair_1 = DirectedUnitPair::new(gm.clone(), gn.clone());
255
256 let one = 1u64.into();
260 let price1 = one;
261 let buy_1 = create_buy(pair_1.clone(), 1u64.into(), price1);
263
264 let id = buy_1.id();
265
266 let position = buy_1;
267 state_tx
268 .update_position_by_price_index(&position.id(), &None, &position)
269 .expect("can update price index");
270 state_tx.put(state_key::position_by_id(&id), position);
271
272 let trading_pair = pair_1.into_directed_trading_pair().into();
274 let mut swap_flow = state_tx.swap_flow(&trading_pair);
275
276 assert!(trading_pair.asset_1() == gm.id());
277
278 swap_flow.0 += gm.value(5u32.into()).amount;
280 swap_flow.1 += 0u32.into();
281
282 state_tx
284 .accumulate_swap_flow(&trading_pair, swap_flow.clone())
285 .await
286 .unwrap();
287 state_tx.apply();
288
289 let routing_params = state.routing_params().await.unwrap();
290 let max_execution = state.get_dex_params().await.unwrap().max_execution_budget;
291 state
293 .handle_batch_swaps(trading_pair, swap_flow, 0, routing_params, max_execution)
294 .await
295 .expect("unable to process batch swaps");
296 }
297}