1use async_trait::async_trait;
2use cnidarium::{StateRead, StateWrite};
3use penumbra_sdk_asset::Value;
4use penumbra_sdk_sct::{component::tree::SctManager, CommitmentSource};
5use penumbra_sdk_tct as tct;
6use tracing::instrument;
78use crate::component::circuit_breaker::value::ValueCircuitBreaker;
9use crate::BatchSwapOutputData;
10use crate::SwapExecution;
11use crate::{
12 component::flow::SwapFlow, state_key, swap::SwapPayload, DirectedTradingPair, TradingPair,
13};
14use anyhow::Result;
15use penumbra_sdk_proto::StateWriteProto;
1617/// Manages the addition of new notes to the chain state.
18#[async_trait]
19pub(crate) trait SwapManager: StateWrite {
20#[instrument(skip(self, swap), fields(commitment = ?swap.commitment))]
21async fn add_swap_payload(&mut self, swap: SwapPayload, source: CommitmentSource) {
22tracing::trace!("adding swap payload");
2324// Record the swap commitment and its metadata in the SCT
25let position = self.add_sct_commitment(swap.commitment, source.clone())
26 .await
27// TODO(erwan): Tracked in #830: we should handle this gracefully
28.expect("inserting into the state commitment tree should not fail because we should budget commitments per block (currently unimplemented)");
2930// Record the payload in object-storage so that we can include it in this block's [`CompactBlock`].
31let mut payloads = self.pending_swap_payloads();
32 payloads.push_back((position, swap, source));
33self.object_put(state_key::pending_payloads(), payloads);
34 }
35}
3637impl<T: StateWrite + ?Sized> SwapManager for T {}
3839pub trait SwapDataRead: StateRead {
40fn pending_swap_payloads(&self) -> im::Vector<(tct::Position, SwapPayload, CommitmentSource)> {
41self.object_get(state_key::pending_payloads())
42 .unwrap_or_default()
43 }
4445/// Get the swap flow for the given trading pair accumulated in this block so far.
46fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
47self.swap_flows().get(pair).cloned().unwrap_or_default()
48 }
4950fn swap_flows(&self) -> im::OrdMap<TradingPair, SwapFlow> {
51self.object_get::<im::OrdMap<TradingPair, SwapFlow>>(state_key::swap_flows())
52 .unwrap_or_default()
53 }
5455fn pending_batch_swap_outputs(&self) -> im::OrdMap<TradingPair, BatchSwapOutputData> {
56self.object_get(state_key::pending_outputs())
57 .unwrap_or_default()
58 }
59}
6061impl<T: StateRead + ?Sized> SwapDataRead for T {}
6263pub(crate) trait SwapDataWrite: StateWrite {
64async fn accumulate_swap_flow(
65&mut self,
66 trading_pair: &TradingPair,
67 swap_flow: SwapFlow,
68 ) -> Result<()> {
69// Credit the DEX for the swap inflows.
70 //
71 // At this point we don't know how much will eventually be filled, so we
72 // credit for all inflows, and then later debit for any unfilled input
73 // in the BSOD.
74self.dex_vcb_credit(Value {
75 amount: swap_flow.0,
76 asset_id: trading_pair.asset_1,
77 })
78 .await?;
79self.dex_vcb_credit(Value {
80 amount: swap_flow.1,
81 asset_id: trading_pair.asset_2,
82 })
83 .await?;
8485// Accumulate the new swap flow into the map.
86let old = self.swap_flows();
87let new = old.alter(
88 |maybe_flow| match maybe_flow {
89Some(flow) => Some((flow.0 + swap_flow.0, flow.1 + swap_flow.1).into()),
90None => Some(swap_flow),
91 },
92*trading_pair,
93 );
94self.object_put(state_key::swap_flows(), new);
9596Ok(())
97 }
9899fn put_swap_execution_at_height(
100&mut self,
101 height: u64,
102 pair: DirectedTradingPair,
103 swap_execution: SwapExecution,
104 ) {
105let path = state_key::swap_execution(height, pair);
106self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution);
107 }
108}
109110impl<T: StateWrite + ?Sized> SwapDataWrite for T {}