penumbra_sdk_dex/component/
chandelier.rs1use anyhow::Ok;
2use anyhow::{Context as _, Result};
3
4use cnidarium::{StateRead, StateWrite};
5use futures::{StreamExt, TryStreamExt as _};
6use penumbra_sdk_num::fixpoint::U128x128;
7use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
8use penumbra_sdk_sct::component::clock::EpochRead as _;
9use tonic::async_trait;
10
11use crate::event::EventCandlestickData;
12use crate::{lp::position::Position, state_key::candlesticks, DirectedTradingPair, SwapExecution};
13
14use crate::CandlestickData;
15
16#[async_trait]
17pub trait CandlestickRead: StateRead {
18 #[tracing::instrument(level = "debug", skip(self))]
19 async fn get_candlestick(
20 &self,
21 trading_pair: &DirectedTradingPair,
22 height: u64,
23 ) -> Result<Option<CandlestickData>> {
24 self.nonverifiable_get(
25 candlesticks::data::by_pair_and_height(trading_pair, height).as_bytes(),
26 )
27 .await
28 }
29
30 async fn candlesticks(
31 &self,
32 trading_pair: &DirectedTradingPair,
33 start_height: u64,
34 limit: usize,
35 ) -> Result<Vec<CandlestickData>> {
36 let prefix = candlesticks::data::by_pair(&trading_pair);
37 let start_height_key = format!("{:020}", start_height).as_bytes().to_vec();
38 tracing::trace!(
39 ?prefix,
40 ?start_height,
41 "searching for candlesticks from starting height"
42 );
43
44 let range = self
45 .nonverifiable_range_raw(Some(prefix.as_bytes()), start_height_key..)
46 .context("error forming range query")?;
47
48 range
49 .take(limit)
50 .and_then(|(_k, v)| async move {
51 CandlestickData::decode(v.as_ref()).context("error deserializing candlestick")
52 })
53 .try_collect()
54 .await
55 }
56}
57impl<T: StateRead + ?Sized> CandlestickRead for T {}
58
59#[async_trait]
60pub trait Chandelier: StateWrite {
61 #[tracing::instrument(level = "debug", skip(self))]
62 async fn record_position_execution(
63 &mut self,
64 prev_state: &Position,
65 new_state: &Position,
66 ) -> Result<()> {
67 if prev_state == new_state {
69 return Ok(());
70 }
71
72 let trading_pair = prev_state.phi.pair;
76 let directed_trading_pair = if new_state.reserves_for(trading_pair.asset_1)
77 > prev_state.reserves_for(trading_pair.asset_1)
78 {
79 DirectedTradingPair::new(trading_pair.asset_1, trading_pair.asset_2)
80 } else {
81 DirectedTradingPair::new(trading_pair.asset_2, trading_pair.asset_1)
82 };
83
84 let mut block_executions = self
85 .block_executions_by_pair(&directed_trading_pair)
86 .clone();
87
88 let execution_price = prev_state
90 .phi
91 .orient_start(directed_trading_pair.end)
92 .context("recording position execution failed, position missing an end = asset 2")?
93 .effective_price();
94
95 let direct_volume = (new_state
97 .reserves_for(directed_trading_pair.start)
98 .context("missing reserves")?
99 - prev_state
100 .reserves_for(directed_trading_pair.start)
101 .context("missing reserves")?)
102 .into();
103
104 tracing::debug!(
105 ?directed_trading_pair,
106 ?execution_price,
107 ?direct_volume,
108 "record position execution"
109 );
110 block_executions.push_back((execution_price, Some(direct_volume), None));
111 self.put_block_executions_by_pair(&directed_trading_pair, block_executions);
112
113 Ok(())
114 }
115
116 #[tracing::instrument(level = "debug", skip(self))]
117 async fn record_swap_execution(&mut self, swap: &SwapExecution) {
118 if swap.output.amount == 0u32.into() || swap.input.amount == 0u32.into() {
122 tracing::debug!(?swap, "skipping swap execution");
123 return;
124 }
125
126 let trading_pair: DirectedTradingPair = DirectedTradingPair {
127 start: swap.input.asset_id,
128 end: swap.output.asset_id,
129 };
130 let mut block_executions = self.block_executions_by_pair(&trading_pair).clone();
131
132 let execution_price = U128x128::ratio(swap.output.amount, swap.input.amount)
133 .expect("input amount is not zero");
134
135 let swap_volume = swap.input.amount.into();
137
138 tracing::debug!(
139 ?trading_pair,
140 ?execution_price,
141 ?swap_volume,
142 "record swap execution"
143 );
144 block_executions.push_back((execution_price, None, Some(swap_volume)));
145 self.put_block_executions_by_pair(&trading_pair, block_executions);
146 }
147
148 #[tracing::instrument(level = "debug", skip(self))]
149 async fn finalize_block_candlesticks(&mut self) -> Result<()> {
150 let height = self.get_block_height().await?;
151
152 let block_executions = self.block_executions();
154
155 for (trading_pair, block_executions) in block_executions.iter() {
156 let mut open = None;
159 let mut close = 0.0;
160 let mut low = f64::INFINITY;
161 let mut high = 0.0;
162 let mut swap_volume = 0.0;
163 let mut direct_volume = 0.0;
164
165 for execution in block_executions {
167 let (price, direct, swap) = execution;
168
169 let price: f64 = (*price).into();
170
171 if open.is_none() {
172 open = Some(price);
173 }
174
175 close = price;
176
177 if price > high {
178 high = price;
179 }
180
181 if price < low {
182 low = price;
183 }
184
185 if let Some(direct) = direct {
186 direct_volume += f64::from(*direct);
187 }
188
189 if let Some(swap) = swap {
190 swap_volume += f64::from(*swap);
191 }
192 }
193
194 let candlestick = CandlestickData {
196 height,
197 open: open.unwrap_or(0.0),
198 close,
199 high,
200 low,
201 direct_volume,
202 swap_volume,
203 };
204 tracing::debug!(
205 ?height,
206 ?trading_pair,
207 ?candlestick,
208 "finalizing candlestick"
209 );
210 self.nonverifiable_put(
211 candlesticks::data::by_pair_and_height(&trading_pair, height).into(),
212 candlestick,
213 );
214 self.record_proto(
215 EventCandlestickData {
216 pair: *trading_pair,
217 stick: candlestick,
218 }
219 .to_proto(),
220 )
221 }
222
223 Ok(())
224 }
225}
226
227impl<T: StateWrite + ?Sized> Chandelier for T {}
228
229#[async_trait]
230trait Inner: StateWrite {
231 #[tracing::instrument(level = "debug", skip(self))]
232 fn block_executions(
233 &self,
234 ) -> im::HashMap<DirectedTradingPair, im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)>>
235 {
236 self.object_get(candlesticks::object::block_executions())
237 .unwrap_or_default()
238 }
239
240 #[tracing::instrument(level = "debug", skip(self))]
241 fn block_executions_by_pair(
242 &self,
243 trading_pair: &DirectedTradingPair,
244 ) -> im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)> {
245 let new = im::Vector::new();
246 let block_executions_map = self.block_executions();
247 block_executions_map
248 .get(trading_pair)
249 .unwrap_or_else(|| &new)
250 .clone()
251 }
252
253 #[tracing::instrument(level = "debug", skip(self))]
254 fn put_block_executions_by_pair(
255 &mut self,
256 trading_pair: &DirectedTradingPair,
257 block_executions: im::Vector<(U128x128, Option<U128x128>, Option<U128x128>)>,
258 ) {
259 let mut block_executions_map = self.block_executions();
260 block_executions_map.insert(trading_pair.clone(), block_executions);
261 self.object_put(
262 candlesticks::object::block_executions(),
263 block_executions_map,
264 );
265 }
266}
267impl<T: StateWrite + ?Sized> Inner for T {}
268
269#[cfg(test)]
270mod tests {
271 use std::sync::Arc;
272
273 use cnidarium::{ArcStateDeltaExt as _, StateDelta, TempStorage};
274 use cnidarium_component::Component as _;
275 use penumbra_sdk_asset::asset;
276 use penumbra_sdk_sct::{component::clock::EpochManager as _, epoch::Epoch};
277 use tendermint::abci;
278
279 use crate::{
280 component::{
281 router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _,
282 SwapDataRead, SwapDataWrite,
283 },
284 DirectedUnitPair,
285 };
286
287 use super::*;
288
289 #[tokio::test]
290 async fn chandelier_basic() -> anyhow::Result<()> {
292 let _ = tracing_subscriber::fmt::try_init();
293 let storage = TempStorage::new().await?.apply_minimal_genesis().await?;
294
295 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
296 let mut state_tx = state.try_begin_transaction().unwrap();
297
298 state_tx.put_block_height(0);
299 state_tx.put_epoch_by_height(
300 0,
301 penumbra_sdk_sct::epoch::Epoch {
302 index: 0,
303 start_height: 0,
304 },
305 );
306 state_tx.apply();
307
308 storage.commit(Arc::try_unwrap(state).unwrap()).await?;
309 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
310
311 let penumbra = asset::Cache::with_known_assets()
316 .get_unit("penumbra")
317 .unwrap();
318 let gn = asset::Cache::with_known_assets().get_unit("gn").unwrap();
319
320 let pair_gn_penumbra = DirectedUnitPair::new(gn.clone(), penumbra.clone());
321
322 let mut state_tx = state.try_begin_transaction().unwrap();
324 let buy_1 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 2u64.into());
325 state_tx.open_position(buy_1).await.unwrap();
326 state_tx.apply();
327
328 let trading_pair = pair_gn_penumbra.into_directed_trading_pair().into();
330
331 let mut swap_flow = state.swap_flow(&trading_pair);
332
333 assert!(trading_pair.asset_1() == penumbra.id());
334
335 swap_flow.0 += 0u32.into();
337 swap_flow.1 += gn.value(1u32.into()).amount;
338
339 Arc::get_mut(&mut state)
342 .unwrap()
343 .accumulate_swap_flow(&trading_pair, swap_flow.clone())
344 .await
345 .unwrap();
346
347 let height = 0u64;
348
349 let end_block = abci::request::EndBlock {
351 height: height.try_into().unwrap(),
352 };
353 Dex::end_block(&mut state, &end_block).await;
354
355 storage.commit(Arc::try_unwrap(state).unwrap()).await?;
356
357 let mut state = Arc::new(StateDelta::new(storage.latest_snapshot()));
359 let mut state_tx = state.try_begin_transaction().unwrap();
361 let height = 1u64;
362 state_tx.put_block_height(height);
363 state_tx.put_epoch_by_height(
364 height,
365 Epoch {
366 index: 0,
367 start_height: 0,
368 },
369 );
370 state_tx.apply();
371
372 assert!(
374 state
375 .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), 0u64)
376 .await
377 .unwrap()
378 .is_some(),
379 "candlestick exists for height 0"
380 );
381
382 let cs = state
383 .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), 0u64)
384 .await
385 .unwrap()
386 .unwrap();
387
388 let one_gn = gn.value(1u32.into());
389 let base_gn = gn.base();
390 let direct_volume: U128x128 = cs.direct_volume.try_into().unwrap();
391 let swap_volume: U128x128 = cs.swap_volume.try_into().unwrap();
392 assert_eq!(cs.height, 0u64, "height is 0");
393 assert_eq!(cs.open, 2.0, "open price is 2.0");
394 assert_eq!(cs.close, 2.0, "close price is 2.0");
395 assert_eq!(cs.high, 2.0, "high price is 2.0");
396 assert_eq!(cs.low, 2.0, "low price is 2.0");
397 assert_eq!(
398 base_gn.value(direct_volume.try_into().unwrap()),
399 one_gn,
400 "direct volume is 1 gn"
401 );
402 assert_eq!(
403 base_gn.value(swap_volume.try_into().unwrap()),
404 one_gn,
405 "swap volume is 1 gn"
406 );
407
408 let mut state_tx = state.try_begin_transaction().unwrap();
410 let buy_1 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 2u64.into());
411 state_tx.open_position(buy_1).await.unwrap();
412 state_tx.apply();
413
414 let mut state_tx = state.try_begin_transaction().unwrap();
417 let buy_2 = create_buy(pair_gn_penumbra.clone(), 1u64.into(), 1u64.into());
418 state_tx.open_position(buy_2).await.unwrap();
419 state_tx.apply();
420
421 let mut swap_flow = state.swap_flow(&trading_pair);
424
425 assert!(trading_pair.asset_1() == penumbra.id());
426
427 swap_flow.0 += 0u32.into();
429 swap_flow.1 += gn.value(2u32.into()).amount;
431
432 Arc::get_mut(&mut state)
435 .unwrap()
436 .accumulate_swap_flow(&trading_pair, swap_flow.clone())
437 .await
438 .unwrap();
439
440 let end_block = abci::request::EndBlock {
442 height: height.try_into().unwrap(),
443 };
444 Dex::end_block(&mut state, &end_block).await;
445 storage.commit(Arc::try_unwrap(state).unwrap()).await?;
446
447 let state = Arc::new(StateDelta::new(storage.latest_snapshot()));
449 assert!(
451 state
452 .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), height)
453 .await
454 .unwrap()
455 .is_some(),
456 "candlestick exists for height 1"
457 );
458
459 let cs = state
460 .get_candlestick(&pair_gn_penumbra.into_directed_trading_pair(), height)
461 .await
462 .unwrap()
463 .unwrap();
464
465 let two_gn = gn.value(2u32.into());
466 let base_gn = gn.base();
467 let direct_volume: U128x128 = cs.direct_volume.try_into().unwrap();
468 let swap_volume: U128x128 = cs.swap_volume.try_into().unwrap();
469 assert_eq!(cs.height, 1u64, "height is 1");
470 assert_eq!(cs.open, 2.0, "open price is 2.0");
471 assert_eq!(cs.close, 1.5, "close price is 1.5");
472 assert_eq!(cs.high, 2.0, "high price is 2.0");
473 assert_eq!(cs.low, 1.0, "low price is 1.0");
474 assert_eq!(
475 base_gn.value(direct_volume.try_into().unwrap()),
476 two_gn,
477 "direct volume is 2 gn"
478 );
479 assert_eq!(
480 base_gn.value(swap_volume.try_into().unwrap()),
481 two_gn,
482 "swap volume is 2 gn"
483 );
484 Ok(())
485 }
486}