penumbra_sdk_dex/component/position_manager/
volume_tracker.rs1use cnidarium::StateWrite;
2use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
3use penumbra_sdk_num::Amount;
4use penumbra_sdk_proto::StateWriteProto;
5use tracing::instrument;
6
7use crate::component::lqt::LqtRead;
8use crate::lp::position::{self, Position};
9use crate::state_key::lqt;
10use crate::{event, DirectedTradingPair};
11use async_trait::async_trait;
12use penumbra_sdk_proto::DomainType;
13use penumbra_sdk_sct::component::clock::EpochRead;
14
15#[async_trait]
16pub(crate) trait PositionVolumeTracker: StateWrite {
17 async fn update_volume_index(
18 &mut self,
19 position_id: &position::Id,
20 prev_state: &Option<Position>,
21 new_state: &Position,
22 ) {
23 if !new_state.phi.matches_input(*STAKING_TOKEN_ASSET_ID) {
25 return;
26 }
27
28 let Some(prev_state) = prev_state else {
30 tracing::debug!(?position_id, "newly opened position, skipping volume index");
31 return;
32 };
33
34 if !matches!(new_state.state, position::State::Opened) {
37 tracing::debug!(
38 ?position_id,
39 "new state is not `Opened`, skipping volume index"
40 );
41 return;
42 }
43
44 let pair = new_state.phi.pair;
45 let other_asset = if pair.asset_1 != *STAKING_TOKEN_ASSET_ID {
46 pair.asset_1
47 } else {
48 pair.asset_2
49 };
50 let flows = new_state
52 .flows(&prev_state)
53 .redirect(DirectedTradingPair {
54 start: *STAKING_TOKEN_ASSET_ID,
55 end: other_asset,
56 })
57 .expect("the staking token is in the pair");
58 let staking_token_outflow = flows.lambda_1();
65
66 let old_volume = self.get_volume_for_position(position_id).await;
68 let new_volume = old_volume.saturating_add(&staking_token_outflow);
69
70 let epoch_index = self
72 .get_current_epoch()
73 .await
74 .expect("epoch is always set")
75 .index;
76
77 self.record_proto(
78 event::EventLqtPositionVolume {
79 epoch_index,
80 position_id: position_id.clone(),
81 asset_id: other_asset,
82 volume: staking_token_outflow,
83 total_volume: new_volume,
84 staking_token_in: flows.delta_1(),
85 asset_in: flows.delta_2(),
86 staking_fees: flows.fee_1(),
87 asset_fees: flows.fee_2(),
88 }
89 .to_proto(),
90 );
91
92 self.update_volume(
93 epoch_index,
94 &other_asset,
95 position_id,
96 old_volume,
97 new_volume,
98 )
99 }
100}
101
102impl<T: StateWrite + ?Sized> PositionVolumeTracker for T {}
103
104trait Inner: StateWrite {
105 #[instrument(skip(self))]
106 fn update_volume(
107 &mut self,
108 epoch_index: u64,
109 asset_id: &asset::Id,
110 position_id: &position::Id,
111 old_volume: Amount,
112 new_volume: Amount,
113 ) {
114 let lookup_key = lqt::v1::lp::lookup::volume_by_position(epoch_index, position_id);
116 use penumbra_sdk_proto::StateWriteProto;
117 self.nonverifiable_put(lookup_key.to_vec(), new_volume);
118
119 let old_index_key =
121 lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, old_volume);
122 self.nonverifiable_delete(old_index_key.to_vec());
124 let new_index_key =
126 lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, new_volume);
127 self.nonverifiable_put(new_index_key.to_vec(), new_volume);
128 }
129}
130
131impl<T: StateWrite + ?Sized> Inner for T {}