1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4 async_trait,
5 index::{BlockEvents, EventBatch, EventBatchContext, Version},
6 AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10 event::{
11 EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12 EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13 },
14 lp::{position::Flows, Reserves},
15 DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18 event::{EventSwap, EventSwapClaim},
19 lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34 use super::DateTime;
35 use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36 use penumbra_sdk_dex::CandlestickData;
37 use std::fmt::Display;
38
39 #[derive(Debug, Clone, Copy)]
44 pub struct Candle {
45 pub open: f64,
46 pub close: f64,
47 pub low: f64,
48 pub high: f64,
49 pub direct_volume: f64,
50 pub swap_volume: f64,
51 }
52
53 impl Candle {
54 pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55 Self {
59 open: data.open,
60 close: data.close,
61 low: data.low,
62 high: data.high,
63 direct_volume: data.direct_volume,
64 swap_volume: data.swap_volume,
65 }
66 }
67
68 pub fn point(price: f64, direct_volume: f64) -> Self {
69 Self {
70 open: price,
71 close: price,
72 low: price,
73 high: price,
74 direct_volume,
75 swap_volume: 0.0,
76 }
77 }
78
79 pub fn merge(&mut self, that: &Self) {
80 self.close = that.close;
81 self.low = self.low.min(that.low);
82 self.high = self.high.max(that.high);
83 self.direct_volume += that.direct_volume;
84 self.swap_volume += that.swap_volume;
85 }
86 }
87
88 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89 pub enum Window {
90 W1m,
91 W15m,
92 W1h,
93 W4h,
94 W1d,
95 W1w,
96 W1mo,
97 }
98
99 impl Window {
100 pub fn all() -> impl Iterator<Item = Self> {
101 [
102 Window::W1m,
103 Window::W15m,
104 Window::W1h,
105 Window::W4h,
106 Window::W1d,
107 Window::W1w,
108 Window::W1mo,
109 ]
110 .into_iter()
111 }
112
113 pub fn anchor(&self, time: DateTime) -> DateTime {
120 let (y, mo, d, h, m) = (
121 time.year(),
122 time.month(),
123 time.day(),
124 time.hour(),
125 time.minute(),
126 );
127 let out = match self {
128 Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129 Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130 Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131 Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132 Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133 Window::W1w => Utc
134 .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135 .single()
136 .and_then(|x| {
137 x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138 }),
139 Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140 };
141 out.unwrap()
142 }
143
144 pub fn subtract_from(&self, time: DateTime) -> DateTime {
145 let delta = match self {
146 Window::W1m => TimeDelta::minutes(1),
147 Window::W15m => TimeDelta::minutes(15),
148 Window::W1h => TimeDelta::hours(1),
149 Window::W4h => TimeDelta::hours(4),
150 Window::W1d => TimeDelta::days(1),
151 Window::W1w => TimeDelta::weeks(1),
152 Window::W1mo => TimeDelta::days(30),
153 };
154 time - delta
155 }
156 }
157
158 impl Display for Window {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 use Window::*;
161 let str = match self {
162 W1m => "1m",
163 W15m => "15m",
164 W1h => "1h",
165 W4h => "4h",
166 W1d => "1d",
167 W1w => "1w",
168 W1mo => "1mo",
169 };
170 write!(f, "{}", str)
171 }
172 }
173
174 #[derive(Debug)]
175 pub struct WindowedCandle {
176 start: DateTime,
177 window: Window,
178 candle: Candle,
179 }
180
181 impl WindowedCandle {
182 pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183 Self {
184 start: window.anchor(now),
185 window,
186 candle,
187 }
188 }
189
190 pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195 let start = self.window.anchor(now);
196 if start > self.start {
198 let old_start = std::mem::replace(&mut self.start, start);
199 let old_candle = std::mem::replace(&mut self.candle, candle);
200 Some((old_start, old_candle))
201 } else {
202 self.candle.merge(&candle);
203 None
204 }
205 }
206
207 pub fn window(&self) -> Window {
208 self.window
209 }
210
211 pub fn flush(self) -> (DateTime, Candle) {
212 (self.start, self.candle)
213 }
214 }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219 use super::*;
220
221 #[derive(Debug)]
223 pub struct Context {
224 asset_start: asset::Id,
225 asset_end: asset::Id,
226 window: Window,
227 state: Option<WindowedCandle>,
228 }
229
230 impl Context {
231 pub async fn load(
232 dbtx: &mut PgTransaction<'_>,
233 asset_start: asset::Id,
234 asset_end: asset::Id,
235 window: Window,
236 ) -> anyhow::Result<Self> {
237 let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238 "
239 SELECT open, close, high, low, direct_volume, swap_volume, start_time
240 FROM dex_ex_price_charts
241 WHERE asset_start = $1
242 AND asset_end = $2
243 AND the_window = $3
244 ORDER BY start_time DESC
245 LIMIT 1
246 ",
247 )
248 .bind(asset_start.to_bytes())
249 .bind(asset_end.to_bytes())
250 .bind(window.to_string())
251 .fetch_optional(dbtx.as_mut())
252 .await?;
253 let state = row.map(
254 |(open, close, high, low, direct_volume, swap_volume, start)| {
255 let candle = Candle {
256 open,
257 close,
258 low,
259 high,
260 direct_volume,
261 swap_volume,
262 };
263 WindowedCandle::new(start, window, candle)
264 },
265 );
266 Ok(Self {
267 asset_start,
268 asset_end,
269 window,
270 state,
271 })
272 }
273
274 async fn write_candle(
275 &self,
276 dbtx: &mut PgTransaction<'_>,
277 start: DateTime,
278 candle: Candle,
279 ) -> anyhow::Result<()> {
280 sqlx::query(
281 "
282 INSERT INTO dex_ex_price_charts(
283 id, asset_start, asset_end, the_window,
284 start_time, open, close, high, low, direct_volume, swap_volume
285 )
286 VALUES(
287 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288 )
289 ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290 open = EXCLUDED.open,
291 close = EXCLUDED.close,
292 high = EXCLUDED.high,
293 low = EXCLUDED.low,
294 direct_volume = EXCLUDED.direct_volume,
295 swap_volume = EXCLUDED.swap_volume
296 ",
297 )
298 .bind(self.asset_start.to_bytes())
299 .bind(self.asset_end.to_bytes())
300 .bind(self.window.to_string())
301 .bind(start)
302 .bind(candle.open)
303 .bind(candle.close)
304 .bind(candle.high)
305 .bind(candle.low)
306 .bind(candle.direct_volume)
307 .bind(candle.swap_volume)
308 .execute(dbtx.as_mut())
309 .await?;
310 Ok(())
311 }
312
313 pub async fn update(
314 &mut self,
315 dbtx: &mut PgTransaction<'_>,
316 now: DateTime,
317 candle: Candle,
318 ) -> anyhow::Result<()> {
319 let state = match self.state.as_mut() {
320 Some(x) => x,
321 None => {
322 self.state = Some(WindowedCandle::new(now, self.window, candle));
323 self.state.as_mut().unwrap()
324 }
325 };
326 if let Some((start, old_candle)) = state.with_candle(now, candle) {
327 self.write_candle(dbtx, start, old_candle).await?;
328 };
329 Ok(())
330 }
331
332 pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333 let state = std::mem::replace(&mut self.state, None);
334 if let Some(state) = state {
335 let (start, candle) = state.flush();
336 self.write_candle(dbtx, start, candle).await?;
337 }
338 Ok(())
339 }
340 }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346 use cometindex::PgTransaction;
347 use penumbra_sdk_asset::asset;
348
349 use super::{Candle, DateTime, PairMetrics, Window};
350
351 pub struct Context {
352 start: asset::Id,
353 end: asset::Id,
354 price: f64,
355 liquidity: f64,
356 start_price_indexing_denom: f64,
357 }
358
359 impl Context {
360 pub async fn load(
361 dbtx: &mut PgTransaction<'_>,
362 start: asset::Id,
363 end: asset::Id,
364 ) -> anyhow::Result<Self> {
365 let row: Option<(f64, f64, f64)> = sqlx::query_as(
366 "
367 SELECT price, liquidity, start_price_indexing_denom
368 FROM dex_ex_pairs_block_snapshot
369 WHERE asset_start = $1
370 AND asset_end = $2
371 ORDER BY id DESC
372 LIMIT 1
373 ",
374 )
375 .bind(start.to_bytes())
376 .bind(end.to_bytes())
377 .fetch_optional(dbtx.as_mut())
378 .await?;
379 let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380 Ok(Self {
381 start,
382 end,
383 price,
384 liquidity,
385 start_price_indexing_denom,
386 })
387 }
388
389 pub async fn update(
390 &mut self,
391 dbtx: &mut PgTransaction<'_>,
392 now: DateTime,
393 candle: Option<Candle>,
394 metrics: PairMetrics,
395 start_price_indexing_denom: Option<f64>,
396 ) -> anyhow::Result<()> {
397 if let Some(candle) = candle {
398 self.price = candle.close;
399 }
400 if let Some(price) = start_price_indexing_denom {
401 self.start_price_indexing_denom = price;
402 }
403 self.liquidity += metrics.liquidity_change;
404
405 sqlx::query(
406 "
407 INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409 )
410 ",
411 )
412 .bind(now)
413 .bind(self.start.to_bytes())
414 .bind(self.end.to_bytes())
415 .bind(self.price)
416 .bind(self.liquidity)
417 .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418 .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419 .bind(self.start_price_indexing_denom)
420 .bind(metrics.trades)
421 .execute(dbtx.as_mut())
422 .await?;
423 Ok(())
424 }
425 }
426
427 pub async fn update_summaries(
428 dbtx: &mut PgTransaction<'_>,
429 now: DateTime,
430 window: Window,
431 ) -> anyhow::Result<()> {
432 let then = window.subtract_from(now);
433 let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434 "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435 )
436 .fetch_all(dbtx.as_mut())
437 .await?;
438 for (start, end) in pairs {
439 sqlx::query(
440 "
441 WITH
442 snapshots AS (
443 SELECT *
444 FROM dex_ex_pairs_block_snapshot
445 WHERE asset_start = $1
446 AND asset_end = $2
447 ),
448 previous AS (
449 SELECT price AS price_then, liquidity AS liquidity_then
450 FROM snapshots
451 WHERE time <= $4
452 ORDER BY time DESC
453 LIMIT 1
454 ),
455 previous_or_default AS (
456 SELECT
457 COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458 COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459 ),
460 now AS (
461 SELECT price, liquidity
462 FROM snapshots
463 WHERE time <= $3
464 ORDER BY time DESC
465 LIMIT 1
466 ),
467 sums AS (
468 SELECT
469 COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470 COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473 COALESCE(SUM(trades), 0.0) AS trades_over_window,
474 COALESCE(MIN(price), 0.0) AS low,
475 COALESCE(MAX(price), 0.0) AS high
476 FROM snapshots
477 WHERE time <= $3
478 AND time >= $4
479 )
480 INSERT INTO dex_ex_pairs_summary
481 SELECT
482 $1, $2, $5,
483 price, price_then,
484 low, high,
485 liquidity, liquidity_then,
486 direct_volume_over_window,
487 swap_volume_over_window,
488 direct_volume_indexing_denom_over_window,
489 swap_volume_indexing_denom_over_window,
490 trades_over_window
491 FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492 ON CONFLICT (asset_start, asset_end, the_window)
493 DO UPDATE SET
494 price = EXCLUDED.price,
495 price_then = EXCLUDED.price_then,
496 liquidity = EXCLUDED.liquidity,
497 liquidity_then = EXCLUDED.liquidity_then,
498 direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499 swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500 direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501 swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502 trades_over_window = EXCLUDED.trades_over_window
503 ",
504 )
505 .bind(start)
506 .bind(end)
507 .bind(now)
508 .bind(then)
509 .bind(window.to_string())
510 .execute(dbtx.as_mut())
511 .await?;
512 }
513 Ok(())
514 }
515
516 pub async fn update_aggregate_summary(
517 dbtx: &mut PgTransaction<'_>,
518 window: Window,
519 denom: asset::Id,
520 min_liquidity: f64,
521 ) -> anyhow::Result<()> {
522 sqlx::query(
524 "
525 WITH
526 eligible_denoms AS (
527 SELECT asset_start as asset, price
528 FROM dex_ex_pairs_summary
529 WHERE asset_end = $1
530 AND liquidity >= $2
531 UNION VALUES ($1, 1.0)
532 ),
533 converted_pairs_summary AS (
534 SELECT
535 asset_start, asset_end,
536 (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537 liquidity * ed_end.price AS liquidity,
538 direct_volume_indexing_denom_over_window AS dv,
539 swap_volume_indexing_denom_over_window AS sv,
540 trades_over_window as trades
541 FROM dex_ex_pairs_summary
542 JOIN eligible_denoms AS ed_end
543 ON ed_end.asset = asset_end
544 JOIN eligible_denoms AS ed_start
545 ON ed_start.asset = asset_start
546 WHERE the_window = $3
547 ),
548 sums AS (
549 SELECT
550 SUM(dv) AS direct_volume,
551 SUM(sv) AS swap_volume,
552 SUM(trades) AS trades
553 FROM converted_pairs_summary WHERE asset_start < asset_end
554 ),
555 total_liquidity AS (
556 SELECT
557 SUM(liquidity) AS liquidity
558 FROM converted_pairs_summary
559 ),
560 undirected_pairs_summary AS (
561 WITH pairs AS (
562 SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563 )
564 SELECT
565 a_start AS asset_start,
566 a_end AS asset_end,
567 (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568 (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569 FROM pairs
570 ),
571 counts AS (
572 SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573 ),
574 largest_sv AS (
575 SELECT
576 asset_start AS largest_sv_trading_pair_start,
577 asset_end AS largest_sv_trading_pair_end,
578 sv AS largest_sv_trading_pair_volume
579 FROM undirected_pairs_summary
580 ORDER BY sv DESC
581 LIMIT 1
582 ),
583 largest_dv AS (
584 SELECT
585 asset_start AS largest_dv_trading_pair_start,
586 asset_end AS largest_dv_trading_pair_end,
587 dv AS largest_dv_trading_pair_volume
588 FROM undirected_pairs_summary
589 ORDER BY dv DESC
590 LIMIT 1
591 ),
592 top_price_mover AS (
593 SELECT
594 asset_start AS top_price_mover_start,
595 asset_end AS top_price_mover_end,
596 price_change AS top_price_mover_change_percent
597 FROM converted_pairs_summary
598 ORDER BY price_change DESC
599 LIMIT 1
600 )
601 INSERT INTO dex_ex_aggregate_summary
602 SELECT
603 $3,
604 direct_volume, swap_volume, liquidity, trades, active_pairs,
605 largest_sv_trading_pair_start,
606 largest_sv_trading_pair_end,
607 largest_sv_trading_pair_volume,
608 largest_dv_trading_pair_start,
609 largest_dv_trading_pair_end,
610 largest_dv_trading_pair_volume,
611 top_price_mover_start,
612 top_price_mover_end,
613 top_price_mover_change_percent
614 FROM
615 sums
616 JOIN largest_sv ON TRUE
617 JOIN largest_dv ON TRUE
618 JOIN top_price_mover ON TRUE
619 JOIN total_liquidity ON TRUE
620 JOIN counts ON TRUE
621 ON CONFLICT (the_window) DO UPDATE SET
622 direct_volume = EXCLUDED.direct_volume,
623 swap_volume = EXCLUDED.swap_volume,
624 liquidity = EXCLUDED.liquidity,
625 trades = EXCLUDED.trades,
626 active_pairs = EXCLUDED.active_pairs,
627 largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628 largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629 largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630 largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631 largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632 largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633 top_price_mover_start = EXCLUDED.top_price_mover_start,
634 top_price_mover_end = EXCLUDED.top_price_mover_end,
635 top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636 ")
637 .bind(denom.to_bytes())
638 .bind(min_liquidity)
639 .bind(window.to_string())
640 .execute(dbtx.as_mut())
641 .await?;
642 Ok(())
643 }
644}
645
646mod metadata {
647 use super::*;
648
649 pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650 sqlx::query(
651 "
652 INSERT INTO dex_ex_metadata
653 VALUES (1, $1)
654 ON CONFLICT (id) DO UPDATE
655 SET id = EXCLUDED.id,
656 quote_asset_id = EXCLUDED.quote_asset_id
657 ",
658 )
659 .bind(quote_asset.to_bytes())
660 .execute(dbtx.as_mut())
661 .await?;
662 Ok(())
663 }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668 trades: f64,
669 liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674 asset_start: asset::Id,
675 asset_end: asset::Id,
676 input: Amount,
677 output: Amount,
678 num_swaps: i32,
679 price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684 time: Option<DateTime>,
685 height: i32,
686 candles: HashMap<DirectedTradingPair, Candle>,
687 metrics: HashMap<DirectedTradingPair, PairMetrics>,
688 positions: BTreeMap<PositionId, Position>,
690 position_opens: Vec<EventPositionOpen>,
692 position_executions: Vec<EventPositionExecution>,
693 position_closes: Vec<EventPositionClose>,
694 position_withdrawals: Vec<EventPositionWithdraw>,
695 batch_swaps: Vec<EventBatchSwap>,
696 swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697 swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698 position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700 position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701 position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705 fn new() -> Self {
706 Self {
707 time: None,
708 height: 0,
709 candles: HashMap::new(),
710 metrics: HashMap::new(),
711 positions: BTreeMap::new(),
712 position_opens: Vec::new(),
713 position_executions: Vec::new(),
714 position_closes: Vec::new(),
715 position_withdrawals: Vec::new(),
716 batch_swaps: Vec::new(),
717 swaps: BTreeMap::new(),
718 swap_claims: BTreeMap::new(),
719 position_open_txs: BTreeMap::new(),
720 position_close_txs: BTreeMap::new(),
721 position_withdrawal_txs: BTreeMap::new(),
722 }
723 }
724
725 fn with_time(&mut self, time: DateTime) {
726 self.time = Some(time)
727 }
728
729 fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730 if !self.metrics.contains_key(pair) {
731 self.metrics.insert(*pair, PairMetrics::default());
732 }
733 self.metrics.get_mut(pair).unwrap()
735 }
736
737 fn with_trace(&mut self, input: Value, output: Value) {
738 let pair_1_2 = DirectedTradingPair {
739 start: input.asset_id,
740 end: output.asset_id,
741 };
742 if pair_1_2.start == pair_1_2.end {
744 tracing::warn!(?input, ?output, "found trace with a loop?");
745 return;
746 }
747 let pair_2_1 = pair_1_2.flip();
748 let input_amount = f64::from(input.amount);
749 let output_amount = f64::from(output.amount);
750 if input_amount == 0.0 && output_amount == 0.0 {
751 tracing::warn!(?input, ?output, "ignoring trace with 0 input and output");
752 return;
753 }
754 let price_1_2 = output_amount / input_amount;
755 let candle_1_2 = Candle::point(price_1_2, input_amount);
756 let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
757 self.metric(&pair_1_2).trades += 1.0;
758 self.candles
759 .entry(pair_1_2)
760 .and_modify(|c| c.merge(&candle_1_2))
761 .or_insert(candle_1_2);
762 self.metric(&pair_2_1).trades += 1.0;
763 self.candles
764 .entry(pair_2_1)
765 .and_modify(|c| c.merge(&candle_2_1))
766 .or_insert(candle_2_1);
767 }
768
769 fn with_swap_execution(&mut self, se: &SwapExecution) {
770 for row in se.traces.iter() {
771 for window in row.windows(2) {
772 self.with_trace(window[0], window[1]);
773 }
774 }
775 let pair_1_2 = DirectedTradingPair {
776 start: se.input.asset_id,
777 end: se.output.asset_id,
778 };
779 if pair_1_2.start == pair_1_2.end {
782 return;
783 }
784 let pair_2_1 = pair_1_2.flip();
785 self.candles
786 .entry(pair_1_2)
787 .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
788 self.candles
789 .entry(pair_2_1)
790 .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
791 }
792
793 fn with_reserve_change(
794 &mut self,
795 pair: &TradingPair,
796 old_reserves: Option<Reserves>,
797 new_reserves: Reserves,
798 removed: bool,
799 ) {
800 let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
801 (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
802 (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
803 (_, Some(old), new) => (
804 (new.r1.value() as f64) - (old.r1.value() as f64),
805 (new.r2.value() as f64) - (old.r2.value() as f64),
806 ),
807 };
808 for (d_pair, diff) in [
809 (
810 DirectedTradingPair {
811 start: pair.asset_1(),
812 end: pair.asset_2(),
813 },
814 diff_2,
815 ),
816 (
817 DirectedTradingPair {
818 start: pair.asset_2(),
819 end: pair.asset_1(),
820 },
821 diff_1,
822 ),
823 ] {
824 self.metric(&d_pair).liquidity_change += diff;
825 }
826 }
827
828 pub fn extract(block: &BlockEvents, ignore_arb_executions: bool) -> anyhow::Result<Self> {
829 let mut out = Self::new();
830 out.height = block.height() as i32;
831
832 for event in block.events() {
833 if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
834 let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
835 "creating timestamp should succeed; timestamp: {}",
836 e.timestamp_seconds
837 ))?;
838 out.with_time(time);
839 } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
840 out.with_reserve_change(
841 &e.trading_pair,
842 None,
843 Reserves {
844 r1: e.reserves_1,
845 r2: e.reserves_2,
846 },
847 false,
848 );
849 if let Some(tx_hash) = event.tx_hash() {
850 out.position_open_txs.insert(e.position_id, tx_hash);
851 }
852 out.positions.insert(e.position_id, e.position.clone());
856 out.position_opens.push(e);
857 } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
858 out.with_reserve_change(
861 &e.trading_pair,
862 None,
863 Reserves {
864 r1: e.reserves_1,
865 r2: e.reserves_2,
866 },
867 true,
868 );
869 if let Some(tx_hash) = event.tx_hash() {
870 out.position_withdrawal_txs.insert(e.position_id, tx_hash);
871 }
872 out.position_withdrawals.push(e);
873 } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
874 out.with_reserve_change(
875 &e.trading_pair,
876 Some(Reserves {
877 r1: e.prev_reserves_1,
878 r2: e.prev_reserves_2,
879 }),
880 Reserves {
881 r1: e.reserves_1,
882 r2: e.reserves_2,
883 },
884 false,
885 );
886 out.position_executions.push(e);
887 } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
888 out.position_closes.push(e);
889 } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
890 if let Some(tx_hash) = event.tx_hash() {
893 out.position_close_txs.insert(e.position_id, tx_hash);
894 }
895 } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
896 out.swaps
897 .entry(e.trading_pair)
898 .or_insert_with(Vec::new)
899 .push(e);
900 } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
901 out.swap_claims
902 .entry(e.trading_pair)
903 .or_insert_with(Vec::new)
904 .push(e);
905 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
906 let pair = DirectedTradingPair {
907 start: e.incentivized_asset_id,
908 end: *STAKING_TOKEN_ASSET_ID,
909 };
910 out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
911 } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
912 if let Some(se) = e.swap_execution_1_for_2.as_ref() {
914 out.with_swap_execution(se);
915 }
916 if let Some(se) = e.swap_execution_2_for_1.as_ref() {
917 out.with_swap_execution(se);
918 }
919 out.batch_swaps.push(e);
920 } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
921 if !ignore_arb_executions {
922 out.with_swap_execution(&e.swap_execution);
923 }
924 }
925 }
926 Ok(out)
927 }
928
929 async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
930 let missing_positions: Vec<_> = self
932 .position_executions
933 .iter()
934 .map(|e| e.position_id)
935 .filter(|id| !self.positions.contains_key(id))
936 .collect();
937
938 if missing_positions.is_empty() {
939 return Ok(());
940 }
941
942 let rows = sqlx::query(
944 "SELECT position_raw
945 FROM dex_ex_position_state
946 WHERE position_id = ANY($1)",
947 )
948 .bind(
949 &missing_positions
950 .iter()
951 .map(|id| id.0.as_ref())
952 .collect::<Vec<_>>(),
953 )
954 .fetch_all(dbtx.as_mut())
955 .await?;
956
957 for row in rows {
959 let position_raw: Vec<u8> = row.get("position_raw");
960 let position = Position::decode(position_raw.as_slice())?;
961 self.positions.insert(position.id(), position);
962 }
963
964 Ok(())
965 }
966
967 pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
969 self.candles
970 .get(&DirectedTradingPair::new(asset, indexing_denom))
971 .map(|x| x.close)
972 }
973}
974
975#[derive(Debug)]
976pub struct Component {
977 denom: asset::Id,
978 min_liquidity: f64,
979 ignore_arb_executions: bool,
980}
981
982impl Component {
983 pub fn new(denom: asset::Id, min_liquidity: f64, ignore_arb_executions: bool) -> Self {
984 Self {
985 denom,
986 min_liquidity,
987 ignore_arb_executions,
988 }
989 }
990
991 async fn record_position_open(
992 &self,
993 dbtx: &mut PgTransaction<'_>,
994 time: DateTime,
995 height: i32,
996 tx_hash: Option<[u8; 32]>,
997 event: &EventPositionOpen,
998 ) -> anyhow::Result<()> {
999 let effective_price_1_to_2: f64 = event
1001 .position
1002 .phi
1003 .orient_start(event.trading_pair.asset_1())
1004 .expect("position trading pair matches")
1005 .effective_price()
1006 .into();
1007
1008 let effective_price_2_to_1: f64 = event
1009 .position
1010 .phi
1011 .orient_start(event.trading_pair.asset_2())
1012 .expect("position trading pair matches")
1013 .effective_price()
1014 .into();
1015
1016 let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1018 "INSERT INTO dex_ex_position_reserves (
1019 position_id,
1020 height,
1021 time,
1022 reserves_1,
1023 reserves_2
1024 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1025 )
1026 .bind(event.position_id.0)
1027 .bind(height)
1028 .bind(time)
1029 .bind(BigDecimal::from(event.reserves_1.value()))
1030 .bind(BigDecimal::from(event.reserves_2.value()))
1031 .fetch_one(dbtx.as_mut())
1032 .await?;
1033
1034 sqlx::query(
1036 "INSERT INTO dex_ex_position_state (
1037 position_id,
1038 asset_1,
1039 asset_2,
1040 p,
1041 q,
1042 close_on_fill,
1043 fee_bps,
1044 effective_price_1_to_2,
1045 effective_price_2_to_1,
1046 position_raw,
1047 opening_time,
1048 opening_height,
1049 opening_tx,
1050 opening_reserves_rowid
1051 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1052 )
1053 .bind(event.position_id.0)
1054 .bind(event.trading_pair.asset_1().to_bytes())
1055 .bind(event.trading_pair.asset_2().to_bytes())
1056 .bind(BigDecimal::from(event.position.phi.component.p.value()))
1057 .bind(BigDecimal::from(event.position.phi.component.q.value()))
1058 .bind(event.position.close_on_fill)
1059 .bind(event.trading_fee as i32)
1060 .bind(effective_price_1_to_2)
1061 .bind(effective_price_2_to_1)
1062 .bind(event.position.encode_to_vec())
1063 .bind(time)
1064 .bind(height)
1065 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1066 .bind(opening_reserves_rowid)
1067 .execute(dbtx.as_mut())
1068 .await?;
1069
1070 Ok(())
1071 }
1072
1073 async fn record_swap_execution_traces(
1074 &self,
1075 dbtx: &mut PgTransaction<'_>,
1076 time: DateTime,
1077 height: i32,
1078 swap_execution: &SwapExecution,
1079 ) -> anyhow::Result<()> {
1080 let SwapExecution {
1081 traces,
1082 input: se_input,
1083 output: se_output,
1084 } = swap_execution;
1085
1086 let asset_start = se_input.asset_id;
1087 let asset_end = se_output.asset_id;
1088 let batch_input = se_input.amount;
1089 let batch_output = se_output.amount;
1090
1091 for trace in traces.iter() {
1092 let Some(input_value) = trace.first() else {
1093 continue;
1094 };
1095 let Some(output_value) = trace.last() else {
1096 continue;
1097 };
1098
1099 let input = input_value.amount;
1100 let output = output_value.amount;
1101
1102 let price_float = (output.value() as f64) / (input.value() as f64);
1103 let amount_hops = trace
1104 .iter()
1105 .map(|x| BigDecimal::from(x.amount.value()))
1106 .collect::<Vec<_>>();
1107 let position_id_hops: Vec<[u8; 32]> = vec![];
1108 let asset_hops = trace
1109 .iter()
1110 .map(|x| x.asset_id.to_bytes())
1111 .collect::<Vec<_>>();
1112
1113 sqlx::query(
1114 "INSERT INTO dex_ex_batch_swap_traces (
1115 height,
1116 time,
1117 input,
1118 output,
1119 batch_input,
1120 batch_output,
1121 price_float,
1122 asset_start,
1123 asset_end,
1124 asset_hops,
1125 amount_hops,
1126 position_id_hops
1127 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1128 )
1129 .bind(height)
1130 .bind(time)
1131 .bind(BigDecimal::from(input.value()))
1132 .bind(BigDecimal::from(output.value()))
1133 .bind(BigDecimal::from(batch_input.value()))
1134 .bind(BigDecimal::from(batch_output.value()))
1135 .bind(price_float)
1136 .bind(asset_start.to_bytes())
1137 .bind(asset_end.to_bytes())
1138 .bind(asset_hops)
1139 .bind(amount_hops)
1140 .bind(position_id_hops)
1141 .execute(dbtx.as_mut())
1142 .await?;
1143 }
1144
1145 Ok(())
1146 }
1147
1148 async fn record_block_summary(
1149 &self,
1150 dbtx: &mut PgTransaction<'_>,
1151 time: DateTime,
1152 height: i32,
1153 events: &Events,
1154 ) -> anyhow::Result<()> {
1155 let num_opened_lps = events.position_opens.len() as i32;
1156 let num_closed_lps = events.position_closes.len() as i32;
1157 let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1158 let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1159 let num_swap_claims = events
1160 .swap_claims
1161 .iter()
1162 .map(|(_, v)| v.len())
1163 .sum::<usize>() as i32;
1164 let num_txs = events.batch_swaps.len() as i32;
1165
1166 let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1167
1168 for event in &events.batch_swaps {
1169 let trading_pair = event.batch_swap_output_data.trading_pair;
1170
1171 if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1172 let asset_start = swap_1_2.input.asset_id;
1173 let asset_end = swap_1_2.output.asset_id;
1174 let input = swap_1_2.input.amount;
1175 let output = swap_1_2.output.amount;
1176 let price_float = (output.value() as f64) / (input.value() as f64);
1177
1178 let empty_vec = vec![];
1179 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1180 let filtered_swaps: Vec<_> = swaps_for_pair
1181 .iter()
1182 .filter(|swap| swap.delta_1_i != Amount::zero())
1183 .collect::<Vec<_>>();
1184 let num_swaps = filtered_swaps.len() as i32;
1185
1186 batch_swap_summaries.push(BatchSwapSummary {
1187 asset_start,
1188 asset_end,
1189 input,
1190 output,
1191 num_swaps,
1192 price_float,
1193 });
1194 }
1195
1196 if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1197 let asset_start = swap_2_1.input.asset_id;
1198 let asset_end = swap_2_1.output.asset_id;
1199 let input = swap_2_1.input.amount;
1200 let output = swap_2_1.output.amount;
1201 let price_float = (output.value() as f64) / (input.value() as f64);
1202
1203 let empty_vec = vec![];
1204 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1205 let filtered_swaps: Vec<_> = swaps_for_pair
1206 .iter()
1207 .filter(|swap| swap.delta_2_i != Amount::zero())
1208 .collect::<Vec<_>>();
1209 let num_swaps = filtered_swaps.len() as i32;
1210
1211 batch_swap_summaries.push(BatchSwapSummary {
1212 asset_start,
1213 asset_end,
1214 input,
1215 output,
1216 num_swaps,
1217 price_float,
1218 });
1219 }
1220 }
1221
1222 sqlx::query(
1223 "INSERT INTO dex_ex_block_summary (
1224 height,
1225 time,
1226 batch_swaps,
1227 num_open_lps,
1228 num_closed_lps,
1229 num_withdrawn_lps,
1230 num_swaps,
1231 num_swap_claims,
1232 num_txs
1233 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1234 )
1235 .bind(height)
1236 .bind(time)
1237 .bind(serde_json::to_value(&batch_swap_summaries)?)
1238 .bind(num_opened_lps)
1239 .bind(num_closed_lps)
1240 .bind(num_withdrawn_lps)
1241 .bind(num_swaps)
1242 .bind(num_swap_claims)
1243 .bind(num_txs)
1244 .execute(dbtx.as_mut())
1245 .await?;
1246
1247 Ok(())
1248 }
1249
1250 async fn record_batch_swap_traces(
1251 &self,
1252 dbtx: &mut PgTransaction<'_>,
1253 time: DateTime,
1254 height: i32,
1255 event: &EventBatchSwap,
1256 ) -> anyhow::Result<()> {
1257 let EventBatchSwap {
1258 batch_swap_output_data: _,
1259 swap_execution_1_for_2,
1260 swap_execution_2_for_1,
1261 } = event;
1262
1263 if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1264 self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1265 .await?;
1266 }
1267
1268 if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1269 self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1270 .await?;
1271 }
1272
1273 Ok(())
1274 }
1275
1276 async fn record_position_execution(
1277 &self,
1278 dbtx: &mut PgTransaction<'_>,
1279 time: DateTime,
1280 height: i32,
1281 event: &EventPositionExecution,
1282 positions: &BTreeMap<PositionId, Position>,
1283 ) -> anyhow::Result<()> {
1284 let position = positions
1286 .get(&event.position_id)
1287 .expect("position must exist for execution");
1288 let current = Reserves {
1289 r1: event.reserves_1,
1290 r2: event.reserves_2,
1291 };
1292 let prev = Reserves {
1293 r1: event.prev_reserves_1,
1294 r2: event.prev_reserves_2,
1295 };
1296 let flows = Flows::from_phi_and_reserves(&position.phi, ¤t, &prev);
1297
1298 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1300 "INSERT INTO dex_ex_position_reserves (
1301 position_id,
1302 height,
1303 time,
1304 reserves_1,
1305 reserves_2
1306 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1307 )
1308 .bind(event.position_id.0)
1309 .bind(height)
1310 .bind(time)
1311 .bind(BigDecimal::from(event.reserves_1.value()))
1312 .bind(BigDecimal::from(event.reserves_2.value()))
1313 .fetch_one(dbtx.as_mut())
1314 .await?;
1315
1316 sqlx::query(
1318 "INSERT INTO dex_ex_position_executions (
1319 position_id,
1320 height,
1321 time,
1322 reserves_rowid,
1323 delta_1,
1324 delta_2,
1325 lambda_1,
1326 lambda_2,
1327 fee_1,
1328 fee_2,
1329 context_asset_start,
1330 context_asset_end
1331 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1332 )
1333 .bind(event.position_id.0)
1334 .bind(height)
1335 .bind(time)
1336 .bind(reserves_rowid)
1337 .bind(BigDecimal::from(flows.delta_1().value()))
1338 .bind(BigDecimal::from(flows.delta_2().value()))
1339 .bind(BigDecimal::from(flows.lambda_1().value()))
1340 .bind(BigDecimal::from(flows.lambda_2().value()))
1341 .bind(BigDecimal::from(flows.fee_1().value()))
1342 .bind(BigDecimal::from(flows.fee_2().value()))
1343 .bind(event.context.start.to_bytes())
1344 .bind(event.context.end.to_bytes())
1345 .execute(dbtx.as_mut())
1346 .await?;
1347
1348 Ok(())
1349 }
1350
1351 async fn record_position_close(
1352 &self,
1353 dbtx: &mut PgTransaction<'_>,
1354 time: DateTime,
1355 height: i32,
1356 tx_hash: Option<[u8; 32]>,
1357 event: &EventPositionClose,
1358 ) -> anyhow::Result<()> {
1359 sqlx::query(
1360 "UPDATE dex_ex_position_state
1361 SET closing_time = $1,
1362 closing_height = $2,
1363 closing_tx = $3
1364 WHERE position_id = $4",
1365 )
1366 .bind(time)
1367 .bind(height)
1368 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1369 .bind(event.position_id.0)
1370 .execute(dbtx.as_mut())
1371 .await?;
1372
1373 Ok(())
1374 }
1375
1376 async fn record_position_withdraw(
1377 &self,
1378 dbtx: &mut PgTransaction<'_>,
1379 time: DateTime,
1380 height: i32,
1381 tx_hash: Option<[u8; 32]>,
1382 event: &EventPositionWithdraw,
1383 ) -> anyhow::Result<()> {
1384 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1386 "INSERT INTO dex_ex_position_reserves (
1387 position_id,
1388 height,
1389 time,
1390 reserves_1,
1391 reserves_2
1392 ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", )
1394 .bind(event.position_id.0)
1395 .bind(height)
1396 .bind(time)
1397 .bind(BigDecimal::from(0)) .fetch_one(dbtx.as_mut())
1399 .await?;
1400
1401 sqlx::query(
1402 "INSERT INTO dex_ex_position_withdrawals (
1403 position_id,
1404 height,
1405 time,
1406 withdrawal_tx,
1407 sequence,
1408 reserves_1,
1409 reserves_2,
1410 reserves_rowid
1411 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1412 )
1413 .bind(event.position_id.0)
1414 .bind(height)
1415 .bind(time)
1416 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1417 .bind(event.sequence as i32)
1418 .bind(BigDecimal::from(event.reserves_1.value()))
1419 .bind(BigDecimal::from(event.reserves_2.value()))
1420 .bind(reserves_rowid)
1421 .execute(dbtx.as_mut())
1422 .await?;
1423
1424 Ok(())
1425 }
1426
1427 async fn record_transaction(
1428 &self,
1429 dbtx: &mut PgTransaction<'_>,
1430 time: DateTime,
1431 height: u64,
1432 transaction_id: [u8; 32],
1433 transaction: Transaction,
1434 ) -> anyhow::Result<()> {
1435 if transaction.transaction_body.actions.is_empty() {
1436 return Ok(());
1437 }
1438 sqlx::query(
1439 "INSERT INTO dex_ex_transactions (
1440 transaction_id,
1441 transaction,
1442 height,
1443 time
1444 ) VALUES ($1, $2, $3, $4)
1445 ",
1446 )
1447 .bind(transaction_id)
1448 .bind(transaction.encode_to_vec())
1449 .bind(i32::try_from(height)?)
1450 .bind(time)
1451 .execute(dbtx.as_mut())
1452 .await?;
1453
1454 Ok(())
1455 }
1456
1457 async fn record_all_transactions(
1458 &self,
1459 dbtx: &mut PgTransaction<'_>,
1460 time: DateTime,
1461 block: &BlockEvents,
1462 ) -> anyhow::Result<()> {
1463 for (tx_id, tx_bytes) in block.transactions() {
1464 let tx = Transaction::try_from(tx_bytes)?;
1465 let height = block.height();
1466 self.record_transaction(dbtx, time, height, tx_id, tx)
1467 .await?;
1468 }
1469 Ok(())
1470 }
1471}
1472
1473#[async_trait]
1474impl AppView for Component {
1475 async fn init_chain(
1476 &self,
1477 _dbtx: &mut PgTransaction,
1478 _: &serde_json::Value,
1479 ) -> Result<(), anyhow::Error> {
1480 Ok(())
1481 }
1482
1483 fn name(&self) -> String {
1484 "dex_ex".to_string()
1485 }
1486
1487 fn version(&self) -> Version {
1488 let hash: [u8; 32] = blake2b_simd::Params::default()
1489 .personal(b"option_hash")
1490 .hash_length(32)
1491 .to_state()
1492 .update(&self.denom.to_bytes())
1493 .update(&self.min_liquidity.to_le_bytes())
1494 .update(&[u8::from(self.ignore_arb_executions)])
1495 .finalize()
1496 .as_bytes()
1497 .try_into()
1498 .expect("Impossible 000-001: expected 32 byte hash");
1499 Version::with_major(2).with_option_hash(hash)
1500 }
1501
1502 async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1503 for statement in include_str!("reset.sql").split(";") {
1504 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1505 }
1506 Ok(())
1507 }
1508
1509 async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1510 for statement in include_str!("schema.sql").split(";") {
1511 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1512 }
1513 Ok(())
1514 }
1515
1516 async fn index_batch(
1517 &self,
1518 dbtx: &mut PgTransaction,
1519 batch: EventBatch,
1520 ctx: EventBatchContext,
1521 ) -> Result<(), anyhow::Error> {
1522 metadata::set(dbtx, self.denom).await?;
1523 let mut charts = HashMap::new();
1524 let mut snapshots = HashMap::new();
1525 let mut last_time = None;
1526 for block in batch.events_by_block() {
1527 let mut events = Events::extract(&block, self.ignore_arb_executions)?;
1528 let time = events
1529 .time
1530 .expect(&format!("no block root event at height {}", block.height()));
1531 last_time = Some(time);
1532
1533 self.record_all_transactions(dbtx, time, block).await?;
1534
1535 events.load_positions(dbtx).await?;
1537
1538 self.record_block_summary(dbtx, time, block.height() as i32, &events)
1540 .await?;
1541
1542 for event in &events.batch_swaps {
1544 self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1545 .await?;
1546 }
1547
1548 for event in &events.position_opens {
1550 let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1551 self.record_position_open(dbtx, time, events.height, tx_hash, event)
1552 .await?;
1553 }
1554
1555 for event in &events.position_executions {
1557 self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1558 .await?;
1559 }
1560
1561 for event in &events.position_closes {
1563 let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1564 self.record_position_close(dbtx, time, events.height, tx_hash, event)
1565 .await?;
1566 }
1567
1568 for event in &events.position_withdrawals {
1570 let tx_hash = events
1571 .position_withdrawal_txs
1572 .get(&event.position_id)
1573 .copied();
1574 self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1575 .await?;
1576 }
1577
1578 for (pair, candle) in &events.candles {
1579 sqlx::query(
1580 "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1581 )
1582 .bind(i64::try_from(block.height())?)
1583 .bind(time)
1584 .bind(pair.start.to_bytes())
1585 .bind(pair.end.to_bytes())
1586 .bind(candle.open)
1587 .bind(candle.close)
1588 .bind(candle.low)
1589 .bind(candle.high)
1590 .bind(candle.direct_volume)
1591 .bind(candle.swap_volume)
1592 .execute(dbtx.as_mut())
1593 .await?;
1594 for window in Window::all() {
1595 let key = (pair.start, pair.end, window);
1596 if !charts.contains_key(&key) {
1597 let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1598 charts.insert(key, ctx);
1599 }
1600 charts
1601 .get_mut(&key)
1602 .unwrap() .update(dbtx, time, *candle)
1604 .await?;
1605 }
1606 }
1607
1608 let block_pairs = events
1609 .candles
1610 .keys()
1611 .chain(events.metrics.keys())
1612 .copied()
1613 .collect::<HashSet<_>>();
1614 for pair in block_pairs {
1615 if !snapshots.contains_key(&pair) {
1616 let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1617 snapshots.insert(pair, ctx);
1618 }
1619 snapshots
1621 .get_mut(&pair)
1622 .unwrap()
1623 .update(
1624 dbtx,
1625 time,
1626 events.candles.get(&pair).copied(),
1627 events.metrics.get(&pair).copied().unwrap_or_default(),
1628 events.price_for(self.denom, pair.start),
1629 )
1630 .await?;
1631 }
1632 }
1633
1634 if ctx.is_last() {
1635 if let Some(now) = last_time {
1636 for window in Window::all() {
1637 summary::update_summaries(dbtx, now, window).await?;
1638 summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1639 .await?;
1640 }
1641 }
1642 }
1643 for chart in charts.into_values() {
1644 chart.unload(dbtx).await?;
1645 }
1646 Ok(())
1647 }
1648}