penumbra_dex/component/
swap_manager.rsuse async_trait::async_trait;
use cnidarium::{StateRead, StateWrite};
use penumbra_asset::Value;
use penumbra_sct::{component::tree::SctManager, CommitmentSource};
use penumbra_tct as tct;
use tracing::instrument;
use crate::component::circuit_breaker::value::ValueCircuitBreaker;
use crate::BatchSwapOutputData;
use crate::SwapExecution;
use crate::{
component::flow::SwapFlow, state_key, swap::SwapPayload, DirectedTradingPair, TradingPair,
};
use anyhow::Result;
use penumbra_proto::StateWriteProto;
#[async_trait]
pub(crate) trait SwapManager: StateWrite {
#[instrument(skip(self, swap), fields(commitment = ?swap.commitment))]
async fn add_swap_payload(&mut self, swap: SwapPayload, source: CommitmentSource) {
tracing::trace!("adding swap payload");
let position = self.add_sct_commitment(swap.commitment, source.clone())
.await
.expect("inserting into the state commitment tree should not fail because we should budget commitments per block (currently unimplemented)");
let mut payloads = self.pending_swap_payloads();
payloads.push_back((position, swap, source));
self.object_put(state_key::pending_payloads(), payloads);
}
}
impl<T: StateWrite + ?Sized> SwapManager for T {}
pub trait SwapDataRead: StateRead {
fn pending_swap_payloads(&self) -> im::Vector<(tct::Position, SwapPayload, CommitmentSource)> {
self.object_get(state_key::pending_payloads())
.unwrap_or_default()
}
fn swap_flow(&self, pair: &TradingPair) -> SwapFlow {
self.swap_flows().get(pair).cloned().unwrap_or_default()
}
fn swap_flows(&self) -> im::OrdMap<TradingPair, SwapFlow> {
self.object_get::<im::OrdMap<TradingPair, SwapFlow>>(state_key::swap_flows())
.unwrap_or_default()
}
fn pending_batch_swap_outputs(&self) -> im::OrdMap<TradingPair, BatchSwapOutputData> {
self.object_get(state_key::pending_outputs())
.unwrap_or_default()
}
}
impl<T: StateRead + ?Sized> SwapDataRead for T {}
pub(crate) trait SwapDataWrite: StateWrite {
async fn accumulate_swap_flow(
&mut self,
trading_pair: &TradingPair,
swap_flow: SwapFlow,
) -> Result<()> {
self.dex_vcb_credit(Value {
amount: swap_flow.0,
asset_id: trading_pair.asset_1,
})
.await?;
self.dex_vcb_credit(Value {
amount: swap_flow.1,
asset_id: trading_pair.asset_2,
})
.await?;
let old = self.swap_flows();
let new = old.alter(
|maybe_flow| match maybe_flow {
Some(flow) => Some((flow.0 + swap_flow.0, flow.1 + swap_flow.1).into()),
None => Some(swap_flow),
},
*trading_pair,
);
self.object_put(state_key::swap_flows(), new);
Ok(())
}
fn put_swap_execution_at_height(
&mut self,
height: u64,
pair: DirectedTradingPair,
swap_execution: SwapExecution,
) {
let path = state_key::swap_execution(height, pair);
self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution);
}
}
impl<T: StateWrite + ?Sized> SwapDataWrite for T {}