penumbra_sdk_dex/component/
swap_manager.rs

1use async_trait::async_trait;
2use cnidarium::{StateRead, StateWrite};
3use penumbra_sdk_asset::Value;
4use penumbra_sdk_sct::{component::tree::SctManager, CommitmentSource};
5use penumbra_sdk_tct as tct;
6use tracing::instrument;
7
8use crate::component::circuit_breaker::value::ValueCircuitBreaker;
9use crate::BatchSwapOutputData;
10use crate::SwapExecution;
11use crate::{
12    component::flow::SwapFlow, state_key, swap::SwapPayload, DirectedTradingPair, TradingPair,
13};
14use anyhow::Result;
15use penumbra_sdk_proto::StateWriteProto;
16
17/// Manages the addition of new notes to the chain state.
18#[async_trait]
19pub(crate) trait SwapManager: StateWrite {
20    #[instrument(skip(self, swap), fields(commitment = ?swap.commitment))]
21    async fn add_swap_payload(&mut self, swap: SwapPayload, source: CommitmentSource) {
22        tracing::trace!("adding swap payload");
23
24        // Record the swap commitment and its metadata in the SCT
25        let position = self.add_sct_commitment(swap.commitment, source.clone())
26            .await
27            // TODO(erwan): Tracked in #830: we should handle this gracefully
28            .expect("inserting into the state commitment tree should not fail because we should budget commitments per block (currently unimplemented)");
29
30        // Record the payload in object-storage so that we can include it in this block's [`CompactBlock`].
31        let mut payloads = self.pending_swap_payloads();
32        payloads.push_back((position, swap, source));
33        self.object_put(state_key::pending_payloads(), payloads);
34    }
35}
36
37impl<T: StateWrite + ?Sized> SwapManager for T {}
38
39pub trait SwapDataRead: StateRead {
40    fn pending_swap_payloads(&self) -> im::Vector<(tct::Position, SwapPayload, CommitmentSource)> {
41        self.object_get(state_key::pending_payloads())
42            .unwrap_or_default()
43    }
44
45    /// Get the swap flow for the given trading pair accumulated in this block so far.
46    fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
47        self.swap_flows().get(pair).cloned().unwrap_or_default()
48    }
49
50    fn swap_flows(&self) -> im::OrdMap<TradingPair, SwapFlow> {
51        self.object_get::<im::OrdMap<TradingPair, SwapFlow>>(state_key::swap_flows())
52            .unwrap_or_default()
53    }
54
55    fn pending_batch_swap_outputs(&self) -> im::OrdMap<TradingPair, BatchSwapOutputData> {
56        self.object_get(state_key::pending_outputs())
57            .unwrap_or_default()
58    }
59}
60
61impl<T: StateRead + ?Sized> SwapDataRead for T {}
62
63pub(crate) trait SwapDataWrite: StateWrite {
64    async fn accumulate_swap_flow(
65        &mut self,
66        trading_pair: &TradingPair,
67        swap_flow: SwapFlow,
68    ) -> Result<()> {
69        // Credit the DEX for the swap inflows.
70        //
71        // At this point we don't know how much will eventually be filled, so we
72        // credit for all inflows, and then later debit for any unfilled input
73        // in the BSOD.
74        self.dex_vcb_credit(Value {
75            amount: swap_flow.0,
76            asset_id: trading_pair.asset_1,
77        })
78        .await?;
79        self.dex_vcb_credit(Value {
80            amount: swap_flow.1,
81            asset_id: trading_pair.asset_2,
82        })
83        .await?;
84
85        // Accumulate the new swap flow into the map.
86        let old = self.swap_flows();
87        let new = old.alter(
88            |maybe_flow| match maybe_flow {
89                Some(flow) => Some((flow.0 + swap_flow.0, flow.1 + swap_flow.1).into()),
90                None => Some(swap_flow),
91            },
92            *trading_pair,
93        );
94        self.object_put(state_key::swap_flows(), new);
95
96        Ok(())
97    }
98
99    fn put_swap_execution_at_height(
100        &mut self,
101        height: u64,
102        pair: DirectedTradingPair,
103        swap_execution: SwapExecution,
104    ) {
105        let path = state_key::swap_execution(height, pair);
106        self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution);
107    }
108}
109
110impl<T: StateWrite + ?Sized> SwapDataWrite for T {}