penumbra_sdk_dex/component/position_manager/
volume_tracker.rs

1use cnidarium::StateWrite;
2use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
3use penumbra_sdk_num::Amount;
4use penumbra_sdk_proto::StateWriteProto;
5use tracing::instrument;
6
7use crate::component::lqt::LqtRead;
8use crate::lp::position::{self, Position};
9use crate::state_key::lqt;
10use crate::{event, DirectedTradingPair};
11use async_trait::async_trait;
12use penumbra_sdk_proto::DomainType;
13use penumbra_sdk_sct::component::clock::EpochRead;
14
15#[async_trait]
16pub(crate) trait PositionVolumeTracker: StateWrite {
17    async fn update_volume_index(
18        &mut self,
19        position_id: &position::Id,
20        prev_state: &Option<Position>,
21        new_state: &Position,
22    ) {
23        // We only index the volume for staking token pairs.
24        if !new_state.phi.matches_input(*STAKING_TOKEN_ASSET_ID) {
25            return;
26        }
27
28        // Or if the position has existed before.
29        let Some(prev_state) = prev_state else {
30            tracing::debug!(?position_id, "newly opened position, skipping volume index");
31            return;
32        };
33
34        // Short-circuit if the position is transitioning to a non-open state.
35        // This might miss some volume updates, but is more conservative on state-flow.
36        if !matches!(new_state.state, position::State::Opened) {
37            tracing::debug!(
38                ?position_id,
39                "new state is not `Opened`, skipping volume index"
40            );
41            return;
42        }
43
44        let pair = new_state.phi.pair;
45        let other_asset = if pair.asset_1 != *STAKING_TOKEN_ASSET_ID {
46            pair.asset_1
47        } else {
48            pair.asset_2
49        };
50        // Get the flows with the first asset being UM, and the other asset
51        let flows = new_state
52            .flows(&prev_state)
53            .redirect(DirectedTradingPair {
54                start: *STAKING_TOKEN_ASSET_ID,
55                end: other_asset,
56            })
57            .expect("the staking token is in the pair");
58        // We want to track the **outflow** of staking tokens from the position.
59        // This means that we track the amount of staking tokens that have left the position.
60        // We do this by comparing the previous and new reserves of the staking token.
61        // We **DO NOT** want to track the volume of the other asset denominated in staking tokens.
62        // We track the *outflow* of the staking token.
63        // "How much inventory has left the position?"
64        let staking_token_outflow = flows.lambda_1();
65
66        // We lookup the previous volume index entry.
67        let old_volume = self.get_volume_for_position(position_id).await;
68        let new_volume = old_volume.saturating_add(&staking_token_outflow);
69
70        // Grab the ambient epoch index.
71        let epoch_index = self
72            .get_current_epoch()
73            .await
74            .expect("epoch is always set")
75            .index;
76
77        self.record_proto(
78            event::EventLqtPositionVolume {
79                epoch_index,
80                position_id: position_id.clone(),
81                asset_id: other_asset,
82                volume: staking_token_outflow,
83                total_volume: new_volume,
84                staking_token_in: flows.delta_1(),
85                asset_in: flows.delta_2(),
86                staking_fees: flows.fee_1(),
87                asset_fees: flows.fee_2(),
88            }
89            .to_proto(),
90        );
91
92        self.update_volume(
93            epoch_index,
94            &other_asset,
95            position_id,
96            old_volume,
97            new_volume,
98        )
99    }
100}
101
102impl<T: StateWrite + ?Sized> PositionVolumeTracker for T {}
103
104trait Inner: StateWrite {
105    #[instrument(skip(self))]
106    fn update_volume(
107        &mut self,
108        epoch_index: u64,
109        asset_id: &asset::Id,
110        position_id: &position::Id,
111        old_volume: Amount,
112        new_volume: Amount,
113    ) {
114        // First, update the lookup index with the new volume.
115        let lookup_key = lqt::v1::lp::lookup::volume_by_position(epoch_index, position_id);
116        use penumbra_sdk_proto::StateWriteProto;
117        self.nonverifiable_put(lookup_key.to_vec(), new_volume);
118
119        // Then, update the sorted index:
120        let old_index_key =
121            lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, old_volume);
122        // Delete the old key:
123        self.nonverifiable_delete(old_index_key.to_vec());
124        // Store the new one:
125        let new_index_key =
126            lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, new_volume);
127        self.nonverifiable_put(new_index_key.to_vec(), new_volume);
128    }
129}
130
131impl<T: StateWrite + ?Sized> Inner for T {}