penumbra_sdk_dex/component/
chandelier.rs

1use anyhow::Ok;
2use anyhow::{Context as _, Result};
3
4use cnidarium::{StateRead, StateWrite};
5use futures::{StreamExt, TryStreamExt as _};
6use penumbra_sdk_num::fixpoint::U128x128;
7use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
8use penumbra_sdk_sct::component::clock::EpochRead as _;
9use tonic::async_trait;
10
11use crate::event::EventCandlestickData;
12use crate::{lp::position::Position, state_key::candlesticks, DirectedTradingPair, SwapExecution};
13
14use crate::CandlestickData;
15
16#[async_trait]
17pub trait CandlestickRead: StateRead {
18    #[tracing::instrument(level = "debug", skip(self))]
19    async fn get_candlestick(
20        &self,
21        trading_pair: &DirectedTradingPair,
22        height: u64,
23    ) -> Result<Option<CandlestickData>> {
24        self.nonverifiable_get(
25            candlesticks::data::by_pair_and_height(trading_pair, height).as_bytes(),
26        )
27        .await
28    }
29
30    async fn candlesticks(
31        &self,
32        trading_pair: &DirectedTradingPair,
33        start_height: u64,
34        limit: usize,
35    ) -> Result<Vec<CandlestickData>> {
36        let prefix = candlesticks::data::by_pair(&trading_pair);
37        let start_height_key = format!("{:020}", start_height).as_bytes().to_vec();
38        tracing::trace!(
39            ?prefix,
40            ?start_height,
41            "searching for candlesticks from starting height"
42        );
43
44        let range = self
45            .nonverifiable_range_raw(Some(prefix.as_bytes()), start_height_key..)
46            .context("error forming range query")?;
47
48        range
49            .take(limit)
50            .and_then(|(_k, v)| async move {
51                CandlestickData::decode(v.as_ref()).context("error deserializing candlestick")
52            })
53            .try_collect()
54            .await
55    }
56}
57impl<T: StateRead + ?Sized> CandlestickRead for T {}
58
59#[async_trait]
60pub trait Chandelier: StateWrite {
61    #[tracing::instrument(level = "debug", skip(self))]
62    async fn record_position_execution(
63        &mut self,
64        prev_state: &Position,
65        new_state: &Position,
66    ) -> Result<()> {
67        // Redundant check that we do not record no-op executions.
68        if prev_state == new_state {
69            return Ok(());
70        }
71
72        // Determine the directionality of the trade.
73        // If the reserves of asset 1 increased and asset 2 decreased, the trade was for asset 1 -> asset 2.
74        // If the reserves of asset 2 increased and asset 1 decreased, the trade was for asset 2 -> asset 1.
75        let trading_pair = prev_state.phi.pair;
76        let directed_trading_pair = if new_state.reserves_for(trading_pair.asset_1)
77            > prev_state.reserves_for(trading_pair.asset_1)
78        {
79            DirectedTradingPair::new(trading_pair.asset_1, trading_pair.asset_2)
80        } else {
81            DirectedTradingPair::new(trading_pair.asset_2, trading_pair.asset_1)
82        };
83
84        let mut block_executions = self
85            .block_executions_by_pair(&directed_trading_pair)
86            .clone();
87
88        // The execution occurred at the price of the previous state.
89        let execution_price = prev_state
90            .phi
91            .orient_start(directed_trading_pair.end)
92            .context("recording position execution failed, position missing an end = asset 2")?
93            .effective_price();
94
95        // The volume can be found by the change in reserves of the input asset.
96        let direct_volume = (new_state
97            .reserves_for(directed_trading_pair.start)
98            .context("missing reserves")?
99            - prev_state
100                .reserves_for(directed_trading_pair.start)
101                .context("missing reserves")?)
102        .into();
103
104        tracing::debug!(
105            ?directed_trading_pair,
106            ?execution_price,
107            ?direct_volume,
108            "record position execution"
109        );
110        block_executions.push_back((execution_price, Some(direct_volume), None));
111        self.put_block_executions_by_pair(&directed_trading_pair, block_executions);
112
113        Ok(())
114    }
115
116    #[tracing::instrument(level = "debug", skip(self))]
117    async fn record_swap_execution(&mut self, swap: &SwapExecution) {
118        // Don't record a swap execution if the output amount was zero.
119        // This is not a superfluous check, as the swap execution could really
120        // have zero output e.g. in the case of a dust swap.
121        if swap.output.amount == 0u32.into() || swap.input.amount == 0u32.into() {
122            tracing::debug!(?swap, "skipping swap execution");
123            return;
124        }
125
126        let trading_pair: DirectedTradingPair = DirectedTradingPair {
127            start: swap.input.asset_id,
128            end: swap.output.asset_id,
129        };
130        let mut block_executions = self.block_executions_by_pair(&trading_pair).clone();
131
132        let execution_price = U128x128::ratio(swap.output.amount, swap.input.amount)
133            .expect("input amount is not zero");
134
135        // The volume is the amount of the input asset.
136        let swap_volume = swap.input.amount.into();
137
138        tracing::debug!(
139            ?trading_pair,
140            ?execution_price,
141            ?swap_volume,
142            "record swap execution"
143        );
144        block_executions.push_back((execution_price, None, Some(swap_volume)));
145        self.put_block_executions_by_pair(&trading_pair, block_executions);
146    }
147
148    #[tracing::instrument(level = "debug", skip(self))]
149    async fn finalize_block_candlesticks(&mut self) -> Result<()> {
150        let height = self.get_block_height().await?;
151
152        // Fetch all the executions for the block.
153        let block_executions = self.block_executions();
154
155        for (trading_pair, block_executions) in block_executions.iter() {
156            // Since the block executions are stored in order as they occurred during the block,
157            // we can iterate through them to create the candlestick data.
158            let mut open = None;
159            let mut close = 0.0;
160            let mut low = f64::INFINITY;
161            let mut high = 0.0;
162            let mut swap_volume = 0.0;
163            let mut direct_volume = 0.0;
164
165            // Create summary data on a per-trading pair basis.
166            for execution in block_executions {
167                let (price, direct, swap) = execution;
168
169                let price: f64 = (*price).into();
170
171                if open.is_none() {
172                    open = Some(price);
173                }
174
175                close = price;
176
177                if price > high {
178                    high = price;
179                }
180
181                if price < low {
182                    low = price;
183                }
184
185                if let Some(direct) = direct {
186                    direct_volume += f64::from(*direct);
187                }
188
189                if let Some(swap) = swap {
190                    swap_volume += f64::from(*swap);
191                }
192            }
193
194            // Store summary data in non-verifiable storage.
195            let candlestick = CandlestickData {
196                height,
197                open: open.unwrap_or(0.0),
198                close,
199                high,
200                low,
201                direct_volume,
202                swap_volume,
203            };
204            tracing::debug!(
205                ?height,
206                ?trading_pair,
207                ?candlestick,
208                "finalizing candlestick"
209            );
210            self.nonverifiable_put(
211                candlesticks::data::by_pair_and_height(&trading_pair, height).into(),
212                candlestick,
213            );
214            self.record_proto(
215                EventCandlestickData {
216                    pair: *trading_pair,
217                    stick: candlestick,
218                }
219                .to_proto(),
220            )
221        }
222
223        Ok(())
224    }
225}
226
227impl<T: StateWrite + ?Sized> Chandelier for T {}
228
229#[async_trait]
230trait Inner: StateWrite {
231    #[tracing::instrument(level = "debug", skip(self))]
232    fn block_executions(
233        &self,
234    ) -> im::HashMap<DirectedTradingPair, im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)>>
235    {
236        self.object_get(candlesticks::object::block_executions())
237            .unwrap_or_default()
238    }
239
240    #[tracing::instrument(level = "debug", skip(self))]
241    fn block_executions_by_pair(
242        &self,
243        trading_pair: &DirectedTradingPair,
244    ) -> im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)> {
245        let new = im::Vector::new();
246        let block_executions_map = self.block_executions();
247        block_executions_map
248            .get(trading_pair)
249            .unwrap_or_else(|| &new)
250            .clone()
251    }
252
253    #[tracing::instrument(level = "debug", skip(self))]
254    fn put_block_executions_by_pair(
255        &mut self,
256        trading_pair: &DirectedTradingPair,
257        block_executions: im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)>,
258    ) {
259        let mut block_executions_map = self.block_executions();
260        block_executions_map.insert(trading_pair.clone(), block_executions);
261        self.object_put(
262            candlesticks::object::block_executions(),
263            block_executions_map,
264        );
265    }
266}
267impl<T: StateWrite + ?Sized> Inner for T {}
268
269#[cfg(test)]
270mod tests {
271    use std::sync::Arc;
272
273    use cnidarium::{ArcStateDeltaExt as _, StateDelta, TempStorage};
274    use cnidarium_component::Component as _;
275    use penumbra_sdk_asset::asset;
276    use penumbra_sdk_sct::{component::clock::EpochManager as _, epoch::Epoch};
277    use tendermint::abci;
278
279    use crate::{
280        component::{
281            router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _,
282            SwapDataRead, SwapDataWrite,
283        },
284        DirectedUnitPair,
285    };
286
287    use super::*;
288
289    #[tokio::test]
290    /// Perform basic tests of the chandelier.
291    async fn chandelier_basic() -> anyhow::Result<()> {
292        let _ = tracing_subscriber::fmt::try_init();
293        let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
294
295        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
296        let mut state_tx = state.try_begin_transaction().unwrap();
297
298        state_tx.put_block_height(0);
299        state_tx.put_epoch_by_height(
300            0,
301            penumbra_sdk_sct::epoch::Epoch {
302                index: 0,
303                start_height: 0,
304            },
305        );
306        state_tx.apply();
307
308        storage.commit(Arc::try_unwrap(state).unwrap()).await?;
309        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
310
311        // Create a single position and execute a swap against it.
312        // We would expect to see direct flow and swap flow equal to each
313        // other, and the price from the position for open/close/high/low.
314
315        let penumbra = asset::Cache::with_known_assets()
316            .get_unit("penumbra")
317            .unwrap();
318        let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
319
320        let pair_gn_penumbra = DirectedUnitPair::new(gn.clone(), penumbra.clone());
321
322        // Create a single 1:2 gn:penumbra position (i.e. buy 1 gn at 2 penumbra).
323        let mut state_tx = state.try_begin_transaction().unwrap();
324        let buy_1 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 2u64.into());
325        state_tx.open_position(buy_1).await.unwrap();
326        state_tx.apply();
327
328        // Now we should be able to fill a 1:1 gn:penumbra swap.
329        let trading_pair = pair_gn_penumbra.into_directed_trading_pair().into();
330
331        let mut swap_flow = state.swap_flow(&trading_pair);
332
333        assert!(trading_pair.asset_1() == penumbra.id());
334
335        // Add the amount of each asset being swapped to the batch swap flow.
336        swap_flow.0 += 0u32.into();
337        swap_flow.1 += gn.value(1u32.into()).amount;
338
339        // Accumulate it into the batch swap flow for the trading pair.
340        // Since this is currently empty this is the same as setting it.
341        Arc::get_mut(&mut state)
342            .unwrap()
343            .accumulate_swap_flow(&trading_pair, swap_flow.clone())
344            .await
345            .unwrap();
346
347        let height = 0u64;
348
349        // End the block so the chandelier is generated
350        let end_block = abci::request::EndBlock {
351            height: height.try_into().unwrap(),
352        };
353        Dex::end_block(&mut state, &end_block).await;
354
355        storage.commit(Arc::try_unwrap(state).unwrap()).await?;
356
357        // Begin a new block and have a few more positions and swaps
358        let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
359        // set the epoch for the next block
360        let mut state_tx = state.try_begin_transaction().unwrap();
361        let height = 1u64;
362        state_tx.put_block_height(height);
363        state_tx.put_epoch_by_height(
364            height,
365            Epoch {
366                index: 0,
367                start_height: 0,
368            },
369        );
370        state_tx.apply();
371
372        // Check if the candlestick is set for height 0
373        assert!(
374            state
375                .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), 0u64)
376                .await
377                .unwrap()
378                .is_some(),
379            "candlestick exists for height 0"
380        );
381
382        let cs = state
383            .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), 0u64)
384            .await
385            .unwrap()
386            .unwrap();
387
388        let one_gn = gn.value(1u32.into());
389        let base_gn = gn.base();
390        let direct_volume: U128x128 = cs.direct_volume.try_into().unwrap();
391        let swap_volume: U128x128 = cs.swap_volume.try_into().unwrap();
392        assert_eq!(cs.height, 0u64, "height is 0");
393        assert_eq!(cs.open, 2.0, "open price is 2.0");
394        assert_eq!(cs.close, 2.0, "close price is 2.0");
395        assert_eq!(cs.high, 2.0, "high price is 2.0");
396        assert_eq!(cs.low, 2.0, "low price is 2.0");
397        assert_eq!(
398            base_gn.value(direct_volume.try_into().unwrap()),
399            one_gn,
400            "direct volume is 1 gn"
401        );
402        assert_eq!(
403            base_gn.value(swap_volume.try_into().unwrap()),
404            one_gn,
405            "swap volume is 1 gn"
406        );
407
408        // Create a single 1:2 gn:penumbra position (i.e. buy 1 gn at 2 penumbra).
409        let mut state_tx = state.try_begin_transaction().unwrap();
410        let buy_1 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 2u64.into());
411        state_tx.open_position(buy_1).await.unwrap();
412        state_tx.apply();
413
414        // Open a higher-priced position.
415        // Create a single 1:1 gn:penumbra position (i.e. buy 1 gn at 1 penumbra).
416        let mut state_tx = state.try_begin_transaction().unwrap();
417        let buy_2 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 1u64.into());
418        state_tx.open_position(buy_2).await.unwrap();
419        state_tx.apply();
420
421        // A swap for gn -> penumbra should first apply against the 1:2 position
422        // resulting in an opening price of 2.0
423        let mut swap_flow = state.swap_flow(&trading_pair);
424
425        assert!(trading_pair.asset_1() == penumbra.id());
426
427        // Add the amount of each asset being swapped to the batch swap flow.
428        swap_flow.0 += 0u32.into();
429        // Swap 2 gn into penumbra, meaning each position is filled.
430        swap_flow.1 += gn.value(2u32.into()).amount;
431
432        // Accumulate it into the batch swap flow for the trading pair.
433        // Since this is currently empty this is the same as setting it.
434        Arc::get_mut(&mut state)
435            .unwrap()
436            .accumulate_swap_flow(&trading_pair, swap_flow.clone())
437            .await
438            .unwrap();
439
440        // End the block so the chandelier is generated
441        let end_block = abci::request::EndBlock {
442            height: height.try_into().unwrap(),
443        };
444        Dex::end_block(&mut state, &end_block).await;
445        storage.commit(Arc::try_unwrap(state).unwrap()).await?;
446
447        // Begin a new block and have a few more positions and swaps
448        let state = Arc::new(StateDelta::new(storage.latest_snapshot()));
449        // Check if the candlestick is set for height 0
450        assert!(
451            state
452                .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), height)
453                .await
454                .unwrap()
455                .is_some(),
456            "candlestick exists for height 1"
457        );
458
459        let cs = state
460            .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), height)
461            .await
462            .unwrap()
463            .unwrap();
464
465        let two_gn = gn.value(2u32.into());
466        let base_gn = gn.base();
467        let direct_volume: U128x128 = cs.direct_volume.try_into().unwrap();
468        let swap_volume: U128x128 = cs.swap_volume.try_into().unwrap();
469        assert_eq!(cs.height, 1u64, "height is 1");
470        assert_eq!(cs.open, 2.0, "open price is 2.0");
471        assert_eq!(cs.close, 1.5, "close price is 1.5");
472        assert_eq!(cs.high, 2.0, "high price is 2.0");
473        assert_eq!(cs.low, 1.0, "low price is 1.0");
474        assert_eq!(
475            base_gn.value(direct_volume.try_into().unwrap()),
476            two_gn,
477            "direct volume is 2 gn"
478        );
479        assert_eq!(
480            base_gn.value(swap_volume.try_into().unwrap()),
481            two_gn,
482            "swap volume is 2 gn"
483        );
484        Ok(())
485    }
486}