1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4 async_trait,
5 index::{BlockEvents, EventBatch, EventBatchContext, Version},
6 AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10 event::{
11 EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12 EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13 },
14 lp::{position::Flows, Reserves},
15 DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18 event::{EventSwap, EventSwapClaim},
19 lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34 use super::DateTime;
35 use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36 use penumbra_sdk_dex::CandlestickData;
37 use std::fmt::Display;
38
39 #[derive(Debug, Clone, Copy)]
44 pub struct Candle {
45 pub open: f64,
46 pub close: f64,
47 pub low: f64,
48 pub high: f64,
49 pub direct_volume: f64,
50 pub swap_volume: f64,
51 }
52
53 impl Candle {
54 pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55 Self {
59 open: data.open,
60 close: data.close,
61 low: data.low,
62 high: data.high,
63 direct_volume: data.direct_volume,
64 swap_volume: data.swap_volume,
65 }
66 }
67
68 pub fn point(price: f64, direct_volume: f64) -> Self {
69 Self {
70 open: price,
71 close: price,
72 low: price,
73 high: price,
74 direct_volume,
75 swap_volume: 0.0,
76 }
77 }
78
79 pub fn merge(&mut self, that: &Self) {
80 self.close = that.close;
81 self.low = self.low.min(that.low);
82 self.high = self.high.max(that.high);
83 self.direct_volume += that.direct_volume;
84 self.swap_volume += that.swap_volume;
85 }
86 }
87
88 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89 pub enum Window {
90 W1m,
91 W15m,
92 W1h,
93 W4h,
94 W1d,
95 W1w,
96 W1mo,
97 }
98
99 impl Window {
100 pub fn all() -> impl Iterator<Item = Self> {
101 [
102 Window::W1m,
103 Window::W15m,
104 Window::W1h,
105 Window::W4h,
106 Window::W1d,
107 Window::W1w,
108 Window::W1mo,
109 ]
110 .into_iter()
111 }
112
113 pub fn anchor(&self, time: DateTime) -> DateTime {
120 let (y, mo, d, h, m) = (
121 time.year(),
122 time.month(),
123 time.day(),
124 time.hour(),
125 time.minute(),
126 );
127 let out = match self {
128 Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129 Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130 Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131 Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132 Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133 Window::W1w => Utc
134 .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135 .single()
136 .and_then(|x| {
137 x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138 }),
139 Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140 };
141 out.unwrap()
142 }
143
144 pub fn subtract_from(&self, time: DateTime) -> DateTime {
145 let delta = match self {
146 Window::W1m => TimeDelta::minutes(1),
147 Window::W15m => TimeDelta::minutes(15),
148 Window::W1h => TimeDelta::hours(1),
149 Window::W4h => TimeDelta::hours(4),
150 Window::W1d => TimeDelta::days(1),
151 Window::W1w => TimeDelta::weeks(1),
152 Window::W1mo => TimeDelta::days(30),
153 };
154 time - delta
155 }
156 }
157
158 impl Display for Window {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 use Window::*;
161 let str = match self {
162 W1m => "1m",
163 W15m => "15m",
164 W1h => "1h",
165 W4h => "4h",
166 W1d => "1d",
167 W1w => "1w",
168 W1mo => "1mo",
169 };
170 write!(f, "{}", str)
171 }
172 }
173
174 #[derive(Debug)]
175 pub struct WindowedCandle {
176 start: DateTime,
177 window: Window,
178 candle: Candle,
179 }
180
181 impl WindowedCandle {
182 pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183 Self {
184 start: window.anchor(now),
185 window,
186 candle,
187 }
188 }
189
190 pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195 let start = self.window.anchor(now);
196 if start > self.start {
198 let old_start = std::mem::replace(&mut self.start, start);
199 let old_candle = std::mem::replace(&mut self.candle, candle);
200 Some((old_start, old_candle))
201 } else {
202 self.candle.merge(&candle);
203 None
204 }
205 }
206
207 pub fn window(&self) -> Window {
208 self.window
209 }
210
211 pub fn flush(self) -> (DateTime, Candle) {
212 (self.start, self.candle)
213 }
214 }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219 use super::*;
220
221 #[derive(Debug)]
223 pub struct Context {
224 asset_start: asset::Id,
225 asset_end: asset::Id,
226 window: Window,
227 state: Option<WindowedCandle>,
228 }
229
230 impl Context {
231 pub async fn load(
232 dbtx: &mut PgTransaction<'_>,
233 asset_start: asset::Id,
234 asset_end: asset::Id,
235 window: Window,
236 ) -> anyhow::Result<Self> {
237 let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238 "
239 SELECT open, close, high, low, direct_volume, swap_volume, start_time
240 FROM dex_ex_price_charts
241 WHERE asset_start = $1
242 AND asset_end = $2
243 AND the_window = $3
244 ORDER BY start_time DESC
245 LIMIT 1
246 ",
247 )
248 .bind(asset_start.to_bytes())
249 .bind(asset_end.to_bytes())
250 .bind(window.to_string())
251 .fetch_optional(dbtx.as_mut())
252 .await?;
253 let state = row.map(
254 |(open, close, high, low, direct_volume, swap_volume, start)| {
255 let candle = Candle {
256 open,
257 close,
258 low,
259 high,
260 direct_volume,
261 swap_volume,
262 };
263 WindowedCandle::new(start, window, candle)
264 },
265 );
266 Ok(Self {
267 asset_start,
268 asset_end,
269 window,
270 state,
271 })
272 }
273
274 async fn write_candle(
275 &self,
276 dbtx: &mut PgTransaction<'_>,
277 start: DateTime,
278 candle: Candle,
279 ) -> anyhow::Result<()> {
280 sqlx::query(
281 "
282 INSERT INTO dex_ex_price_charts(
283 id, asset_start, asset_end, the_window,
284 start_time, open, close, high, low, direct_volume, swap_volume
285 )
286 VALUES(
287 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288 )
289 ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290 open = EXCLUDED.open,
291 close = EXCLUDED.close,
292 high = EXCLUDED.high,
293 low = EXCLUDED.low,
294 direct_volume = EXCLUDED.direct_volume,
295 swap_volume = EXCLUDED.swap_volume
296 ",
297 )
298 .bind(self.asset_start.to_bytes())
299 .bind(self.asset_end.to_bytes())
300 .bind(self.window.to_string())
301 .bind(start)
302 .bind(candle.open)
303 .bind(candle.close)
304 .bind(candle.high)
305 .bind(candle.low)
306 .bind(candle.direct_volume)
307 .bind(candle.swap_volume)
308 .execute(dbtx.as_mut())
309 .await?;
310 Ok(())
311 }
312
313 pub async fn update(
314 &mut self,
315 dbtx: &mut PgTransaction<'_>,
316 now: DateTime,
317 candle: Candle,
318 ) -> anyhow::Result<()> {
319 let state = match self.state.as_mut() {
320 Some(x) => x,
321 None => {
322 self.state = Some(WindowedCandle::new(now, self.window, candle));
323 self.state.as_mut().unwrap()
324 }
325 };
326 if let Some((start, old_candle)) = state.with_candle(now, candle) {
327 self.write_candle(dbtx, start, old_candle).await?;
328 };
329 Ok(())
330 }
331
332 pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333 let state = std::mem::replace(&mut self.state, None);
334 if let Some(state) = state {
335 let (start, candle) = state.flush();
336 self.write_candle(dbtx, start, candle).await?;
337 }
338 Ok(())
339 }
340 }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346 use cometindex::PgTransaction;
347 use penumbra_sdk_asset::asset;
348
349 use super::{Candle, DateTime, PairMetrics, Window};
350
351 pub struct Context {
352 start: asset::Id,
353 end: asset::Id,
354 price: f64,
355 liquidity: f64,
356 start_price_indexing_denom: f64,
357 }
358
359 impl Context {
360 pub async fn load(
361 dbtx: &mut PgTransaction<'_>,
362 start: asset::Id,
363 end: asset::Id,
364 ) -> anyhow::Result<Self> {
365 let row: Option<(f64, f64, f64)> = sqlx::query_as(
366 "
367 SELECT price, liquidity, start_price_indexing_denom
368 FROM dex_ex_pairs_block_snapshot
369 WHERE asset_start = $1
370 AND asset_end = $2
371 ORDER BY id DESC
372 LIMIT 1
373 ",
374 )
375 .bind(start.to_bytes())
376 .bind(end.to_bytes())
377 .fetch_optional(dbtx.as_mut())
378 .await?;
379 let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380 Ok(Self {
381 start,
382 end,
383 price,
384 liquidity,
385 start_price_indexing_denom,
386 })
387 }
388
389 pub async fn update(
390 &mut self,
391 dbtx: &mut PgTransaction<'_>,
392 now: DateTime,
393 candle: Option<Candle>,
394 metrics: PairMetrics,
395 start_price_indexing_denom: Option<f64>,
396 ) -> anyhow::Result<()> {
397 if let Some(candle) = candle {
398 self.price = candle.close;
399 }
400 if let Some(price) = start_price_indexing_denom {
401 self.start_price_indexing_denom = price;
402 }
403 self.liquidity += metrics.liquidity_change;
404
405 sqlx::query(
406 "
407 INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409 )
410 ",
411 )
412 .bind(now)
413 .bind(self.start.to_bytes())
414 .bind(self.end.to_bytes())
415 .bind(self.price)
416 .bind(self.liquidity)
417 .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418 .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419 .bind(self.start_price_indexing_denom)
420 .bind(metrics.trades)
421 .execute(dbtx.as_mut())
422 .await?;
423 Ok(())
424 }
425 }
426
427 pub async fn update_summaries(
428 dbtx: &mut PgTransaction<'_>,
429 now: DateTime,
430 window: Window,
431 ) -> anyhow::Result<()> {
432 let then = window.subtract_from(now);
433 let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434 "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435 )
436 .fetch_all(dbtx.as_mut())
437 .await?;
438 for (start, end) in pairs {
439 sqlx::query(
440 "
441 WITH
442 snapshots AS (
443 SELECT *
444 FROM dex_ex_pairs_block_snapshot
445 WHERE asset_start = $1
446 AND asset_end = $2
447 ),
448 previous AS (
449 SELECT price AS price_then, liquidity AS liquidity_then
450 FROM snapshots
451 WHERE time <= $4
452 ORDER BY time DESC
453 LIMIT 1
454 ),
455 previous_or_default AS (
456 SELECT
457 COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458 COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459 ),
460 now AS (
461 SELECT price, liquidity
462 FROM snapshots
463 WHERE time <= $3
464 ORDER BY time DESC
465 LIMIT 1
466 ),
467 sums AS (
468 SELECT
469 COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470 COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473 COALESCE(SUM(trades), 0.0) AS trades_over_window,
474 COALESCE(MIN(price), 0.0) AS low,
475 COALESCE(MAX(price), 0.0) AS high
476 FROM snapshots
477 WHERE time <= $3
478 AND time >= $4
479 )
480 INSERT INTO dex_ex_pairs_summary
481 SELECT
482 $1, $2, $5,
483 price, price_then,
484 low, high,
485 liquidity, liquidity_then,
486 direct_volume_over_window,
487 swap_volume_over_window,
488 direct_volume_indexing_denom_over_window,
489 swap_volume_indexing_denom_over_window,
490 trades_over_window
491 FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492 ON CONFLICT (asset_start, asset_end, the_window)
493 DO UPDATE SET
494 price = EXCLUDED.price,
495 price_then = EXCLUDED.price_then,
496 liquidity = EXCLUDED.liquidity,
497 liquidity_then = EXCLUDED.liquidity_then,
498 direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499 swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500 direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501 swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502 trades_over_window = EXCLUDED.trades_over_window
503 ",
504 )
505 .bind(start)
506 .bind(end)
507 .bind(now)
508 .bind(then)
509 .bind(window.to_string())
510 .execute(dbtx.as_mut())
511 .await?;
512 }
513 Ok(())
514 }
515
516 pub async fn update_aggregate_summary(
517 dbtx: &mut PgTransaction<'_>,
518 window: Window,
519 denom: asset::Id,
520 min_liquidity: f64,
521 ) -> anyhow::Result<()> {
522 sqlx::query(
524 "
525 WITH
526 eligible_denoms AS (
527 SELECT asset_start as asset, price
528 FROM dex_ex_pairs_summary
529 WHERE asset_end = $1
530 AND liquidity >= $2
531 UNION VALUES ($1, 1.0)
532 ),
533 converted_pairs_summary AS (
534 SELECT
535 asset_start, asset_end,
536 (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537 liquidity * ed_end.price AS liquidity,
538 direct_volume_indexing_denom_over_window AS dv,
539 swap_volume_indexing_denom_over_window AS sv,
540 trades_over_window as trades
541 FROM dex_ex_pairs_summary
542 JOIN eligible_denoms AS ed_end
543 ON ed_end.asset = asset_end
544 JOIN eligible_denoms AS ed_start
545 ON ed_start.asset = asset_start
546 WHERE the_window = $3
547 ),
548 sums AS (
549 SELECT
550 SUM(dv) AS direct_volume,
551 SUM(sv) AS swap_volume,
552 SUM(trades) AS trades
553 FROM converted_pairs_summary WHERE asset_start < asset_end
554 ),
555 total_liquidity AS (
556 SELECT
557 SUM(liquidity) AS liquidity
558 FROM converted_pairs_summary
559 ),
560 undirected_pairs_summary AS (
561 WITH pairs AS (
562 SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563 )
564 SELECT
565 a_start AS asset_start,
566 a_end AS asset_end,
567 (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568 (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569 FROM pairs
570 ),
571 counts AS (
572 SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573 ),
574 largest_sv AS (
575 SELECT
576 asset_start AS largest_sv_trading_pair_start,
577 asset_end AS largest_sv_trading_pair_end,
578 sv AS largest_sv_trading_pair_volume
579 FROM undirected_pairs_summary
580 ORDER BY sv DESC
581 LIMIT 1
582 ),
583 largest_dv AS (
584 SELECT
585 asset_start AS largest_dv_trading_pair_start,
586 asset_end AS largest_dv_trading_pair_end,
587 dv AS largest_dv_trading_pair_volume
588 FROM undirected_pairs_summary
589 ORDER BY dv DESC
590 LIMIT 1
591 ),
592 top_price_mover AS (
593 SELECT
594 asset_start AS top_price_mover_start,
595 asset_end AS top_price_mover_end,
596 price_change AS top_price_mover_change_percent
597 FROM converted_pairs_summary
598 ORDER BY price_change DESC
599 LIMIT 1
600 )
601 INSERT INTO dex_ex_aggregate_summary
602 SELECT
603 $3,
604 direct_volume, swap_volume, liquidity, trades, active_pairs,
605 largest_sv_trading_pair_start,
606 largest_sv_trading_pair_end,
607 largest_sv_trading_pair_volume,
608 largest_dv_trading_pair_start,
609 largest_dv_trading_pair_end,
610 largest_dv_trading_pair_volume,
611 top_price_mover_start,
612 top_price_mover_end,
613 top_price_mover_change_percent
614 FROM
615 sums
616 JOIN largest_sv ON TRUE
617 JOIN largest_dv ON TRUE
618 JOIN top_price_mover ON TRUE
619 JOIN total_liquidity ON TRUE
620 JOIN counts ON TRUE
621 ON CONFLICT (the_window) DO UPDATE SET
622 direct_volume = EXCLUDED.direct_volume,
623 swap_volume = EXCLUDED.swap_volume,
624 liquidity = EXCLUDED.liquidity,
625 trades = EXCLUDED.trades,
626 active_pairs = EXCLUDED.active_pairs,
627 largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628 largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629 largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630 largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631 largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632 largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633 top_price_mover_start = EXCLUDED.top_price_mover_start,
634 top_price_mover_end = EXCLUDED.top_price_mover_end,
635 top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636 ")
637 .bind(denom.to_bytes())
638 .bind(min_liquidity)
639 .bind(window.to_string())
640 .execute(dbtx.as_mut())
641 .await?;
642 Ok(())
643 }
644}
645
646mod metadata {
647 use super::*;
648
649 pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650 sqlx::query(
651 "
652 INSERT INTO dex_ex_metadata
653 VALUES (1, $1)
654 ON CONFLICT (id) DO UPDATE
655 SET id = EXCLUDED.id,
656 quote_asset_id = EXCLUDED.quote_asset_id
657 ",
658 )
659 .bind(quote_asset.to_bytes())
660 .execute(dbtx.as_mut())
661 .await?;
662 Ok(())
663 }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668 trades: f64,
669 liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674 asset_start: asset::Id,
675 asset_end: asset::Id,
676 input: Amount,
677 output: Amount,
678 num_swaps: i32,
679 price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684 time: Option<DateTime>,
685 height: i32,
686 candles: HashMap<DirectedTradingPair, Candle>,
687 metrics: HashMap<DirectedTradingPair, PairMetrics>,
688 positions: BTreeMap<PositionId, Position>,
690 position_opens: Vec<EventPositionOpen>,
692 position_executions: Vec<EventPositionExecution>,
693 position_closes: Vec<EventPositionClose>,
694 position_withdrawals: Vec<EventPositionWithdraw>,
695 batch_swaps: Vec<EventBatchSwap>,
696 swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697 swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698 position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700 position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701 position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705 fn new() -> Self {
706 Self {
707 time: None,
708 height: 0,
709 candles: HashMap::new(),
710 metrics: HashMap::new(),
711 positions: BTreeMap::new(),
712 position_opens: Vec::new(),
713 position_executions: Vec::new(),
714 position_closes: Vec::new(),
715 position_withdrawals: Vec::new(),
716 batch_swaps: Vec::new(),
717 swaps: BTreeMap::new(),
718 swap_claims: BTreeMap::new(),
719 position_open_txs: BTreeMap::new(),
720 position_close_txs: BTreeMap::new(),
721 position_withdrawal_txs: BTreeMap::new(),
722 }
723 }
724
725 fn with_time(&mut self, time: DateTime) {
726 self.time = Some(time)
727 }
728
729 fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730 if !self.metrics.contains_key(pair) {
731 self.metrics.insert(*pair, PairMetrics::default());
732 }
733 self.metrics.get_mut(pair).unwrap()
735 }
736
737 fn with_trace(&mut self, input: Value, output: Value) {
738 let pair_1_2 = DirectedTradingPair {
739 start: input.asset_id,
740 end: output.asset_id,
741 };
742 if pair_1_2.start == pair_1_2.end {
744 tracing::warn!(?input, ?output, "found trace with a loop?");
745 return;
746 }
747 let pair_2_1 = pair_1_2.flip();
748 let input_amount = f64::from(input.amount);
749 let output_amount = f64::from(output.amount);
750 let price_1_2 = output_amount / input_amount;
751 let candle_1_2 = Candle::point(price_1_2, input_amount);
752 let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
753 self.metric(&pair_1_2).trades += 1.0;
754 self.candles
755 .entry(pair_1_2)
756 .and_modify(|c| c.merge(&candle_1_2))
757 .or_insert(candle_1_2);
758 self.metric(&pair_2_1).trades += 1.0;
759 self.candles
760 .entry(pair_2_1)
761 .and_modify(|c| c.merge(&candle_2_1))
762 .or_insert(candle_2_1);
763 }
764
765 fn with_swap_execution(&mut self, se: &SwapExecution) {
766 for row in se.traces.iter() {
767 for window in row.windows(2) {
768 self.with_trace(window[0], window[1]);
769 }
770 }
771 let pair_1_2 = DirectedTradingPair {
772 start: se.input.asset_id,
773 end: se.output.asset_id,
774 };
775 if pair_1_2.start == pair_1_2.end {
778 return;
779 }
780 let pair_2_1 = pair_1_2.flip();
781 self.candles
782 .entry(pair_1_2)
783 .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
784 self.candles
785 .entry(pair_2_1)
786 .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
787 }
788
789 fn with_reserve_change(
790 &mut self,
791 pair: &TradingPair,
792 old_reserves: Option<Reserves>,
793 new_reserves: Reserves,
794 removed: bool,
795 ) {
796 let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
797 (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
798 (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
799 (_, Some(old), new) => (
800 (new.r1.value() as f64) - (old.r1.value() as f64),
801 (new.r2.value() as f64) - (old.r2.value() as f64),
802 ),
803 };
804 for (d_pair, diff) in [
805 (
806 DirectedTradingPair {
807 start: pair.asset_1(),
808 end: pair.asset_2(),
809 },
810 diff_2,
811 ),
812 (
813 DirectedTradingPair {
814 start: pair.asset_2(),
815 end: pair.asset_1(),
816 },
817 diff_1,
818 ),
819 ] {
820 self.metric(&d_pair).liquidity_change += diff;
821 }
822 }
823
824 pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
825 let mut out = Self::new();
826 out.height = block.height() as i32;
827
828 for event in block.events() {
829 if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
830 let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
831 "creating timestamp should succeed; timestamp: {}",
832 e.timestamp_seconds
833 ))?;
834 out.with_time(time);
835 } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
836 out.with_reserve_change(
837 &e.trading_pair,
838 None,
839 Reserves {
840 r1: e.reserves_1,
841 r2: e.reserves_2,
842 },
843 false,
844 );
845 if let Some(tx_hash) = event.tx_hash() {
846 out.position_open_txs.insert(e.position_id, tx_hash);
847 }
848 out.positions.insert(e.position_id, e.position.clone());
852 out.position_opens.push(e);
853 } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
854 out.with_reserve_change(
857 &e.trading_pair,
858 None,
859 Reserves {
860 r1: e.reserves_1,
861 r2: e.reserves_2,
862 },
863 true,
864 );
865 if let Some(tx_hash) = event.tx_hash() {
866 out.position_withdrawal_txs.insert(e.position_id, tx_hash);
867 }
868 out.position_withdrawals.push(e);
869 } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
870 out.with_reserve_change(
871 &e.trading_pair,
872 Some(Reserves {
873 r1: e.prev_reserves_1,
874 r2: e.prev_reserves_2,
875 }),
876 Reserves {
877 r1: e.reserves_1,
878 r2: e.reserves_2,
879 },
880 false,
881 );
882 out.position_executions.push(e);
883 } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
884 out.position_closes.push(e);
885 } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
886 if let Some(tx_hash) = event.tx_hash() {
889 out.position_close_txs.insert(e.position_id, tx_hash);
890 }
891 } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
892 out.swaps
893 .entry(e.trading_pair)
894 .or_insert_with(Vec::new)
895 .push(e);
896 } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
897 out.swap_claims
898 .entry(e.trading_pair)
899 .or_insert_with(Vec::new)
900 .push(e);
901 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
902 let pair = DirectedTradingPair {
903 start: e.incentivized_asset_id,
904 end: *STAKING_TOKEN_ASSET_ID,
905 };
906 out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
907 } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
908 if let Some(se) = e.swap_execution_1_for_2.as_ref() {
910 out.with_swap_execution(se);
911 }
912 if let Some(se) = e.swap_execution_2_for_1.as_ref() {
913 out.with_swap_execution(se);
914 }
915 out.batch_swaps.push(e);
916 } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
917 out.with_swap_execution(&e.swap_execution);
918 }
919 }
920 Ok(out)
921 }
922
923 async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
924 let missing_positions: Vec<_> = self
926 .position_executions
927 .iter()
928 .map(|e| e.position_id)
929 .filter(|id| !self.positions.contains_key(id))
930 .collect();
931
932 if missing_positions.is_empty() {
933 return Ok(());
934 }
935
936 let rows = sqlx::query(
938 "SELECT position_raw
939 FROM dex_ex_position_state
940 WHERE position_id = ANY($1)",
941 )
942 .bind(
943 &missing_positions
944 .iter()
945 .map(|id| id.0.as_ref())
946 .collect::<Vec<_>>(),
947 )
948 .fetch_all(dbtx.as_mut())
949 .await?;
950
951 for row in rows {
953 let position_raw: Vec<u8> = row.get("position_raw");
954 let position = Position::decode(position_raw.as_slice())?;
955 self.positions.insert(position.id(), position);
956 }
957
958 Ok(())
959 }
960
961 pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
963 self.candles
964 .get(&DirectedTradingPair::new(asset, indexing_denom))
965 .map(|x| x.close)
966 }
967}
968
969#[derive(Debug)]
970pub struct Component {
971 denom: asset::Id,
972 min_liquidity: f64,
973}
974
975impl Component {
976 pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
977 Self {
978 denom,
979 min_liquidity,
980 }
981 }
982
983 async fn record_position_open(
984 &self,
985 dbtx: &mut PgTransaction<'_>,
986 time: DateTime,
987 height: i32,
988 tx_hash: Option<[u8; 32]>,
989 event: &EventPositionOpen,
990 ) -> anyhow::Result<()> {
991 let effective_price_1_to_2: f64 = event
993 .position
994 .phi
995 .orient_start(event.trading_pair.asset_1())
996 .expect("position trading pair matches")
997 .effective_price()
998 .into();
999
1000 let effective_price_2_to_1: f64 = event
1001 .position
1002 .phi
1003 .orient_start(event.trading_pair.asset_2())
1004 .expect("position trading pair matches")
1005 .effective_price()
1006 .into();
1007
1008 let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1010 "INSERT INTO dex_ex_position_reserves (
1011 position_id,
1012 height,
1013 time,
1014 reserves_1,
1015 reserves_2
1016 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1017 )
1018 .bind(event.position_id.0)
1019 .bind(height)
1020 .bind(time)
1021 .bind(BigDecimal::from(event.reserves_1.value()))
1022 .bind(BigDecimal::from(event.reserves_2.value()))
1023 .fetch_one(dbtx.as_mut())
1024 .await?;
1025
1026 sqlx::query(
1028 "INSERT INTO dex_ex_position_state (
1029 position_id,
1030 asset_1,
1031 asset_2,
1032 p,
1033 q,
1034 close_on_fill,
1035 fee_bps,
1036 effective_price_1_to_2,
1037 effective_price_2_to_1,
1038 position_raw,
1039 opening_time,
1040 opening_height,
1041 opening_tx,
1042 opening_reserves_rowid
1043 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1044 )
1045 .bind(event.position_id.0)
1046 .bind(event.trading_pair.asset_1().to_bytes())
1047 .bind(event.trading_pair.asset_2().to_bytes())
1048 .bind(BigDecimal::from(event.position.phi.component.p.value()))
1049 .bind(BigDecimal::from(event.position.phi.component.q.value()))
1050 .bind(event.position.close_on_fill)
1051 .bind(event.trading_fee as i32)
1052 .bind(effective_price_1_to_2)
1053 .bind(effective_price_2_to_1)
1054 .bind(event.position.encode_to_vec())
1055 .bind(time)
1056 .bind(height)
1057 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1058 .bind(opening_reserves_rowid)
1059 .execute(dbtx.as_mut())
1060 .await?;
1061
1062 Ok(())
1063 }
1064
1065 async fn record_swap_execution_traces(
1066 &self,
1067 dbtx: &mut PgTransaction<'_>,
1068 time: DateTime,
1069 height: i32,
1070 swap_execution: &SwapExecution,
1071 ) -> anyhow::Result<()> {
1072 let SwapExecution {
1073 traces,
1074 input: se_input,
1075 output: se_output,
1076 } = swap_execution;
1077
1078 let asset_start = se_input.asset_id;
1079 let asset_end = se_output.asset_id;
1080 let batch_input = se_input.amount;
1081 let batch_output = se_output.amount;
1082
1083 for trace in traces.iter() {
1084 let Some(input_value) = trace.first() else {
1085 continue;
1086 };
1087 let Some(output_value) = trace.last() else {
1088 continue;
1089 };
1090
1091 let input = input_value.amount;
1092 let output = output_value.amount;
1093
1094 let price_float = (output.value() as f64) / (input.value() as f64);
1095 let amount_hops = trace
1096 .iter()
1097 .map(|x| BigDecimal::from(x.amount.value()))
1098 .collect::<Vec<_>>();
1099 let position_id_hops: Vec<[u8; 32]> = vec![];
1100 let asset_hops = trace
1101 .iter()
1102 .map(|x| x.asset_id.to_bytes())
1103 .collect::<Vec<_>>();
1104
1105 sqlx::query(
1106 "INSERT INTO dex_ex_batch_swap_traces (
1107 height,
1108 time,
1109 input,
1110 output,
1111 batch_input,
1112 batch_output,
1113 price_float,
1114 asset_start,
1115 asset_end,
1116 asset_hops,
1117 amount_hops,
1118 position_id_hops
1119 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1120 )
1121 .bind(height)
1122 .bind(time)
1123 .bind(BigDecimal::from(input.value()))
1124 .bind(BigDecimal::from(output.value()))
1125 .bind(BigDecimal::from(batch_input.value()))
1126 .bind(BigDecimal::from(batch_output.value()))
1127 .bind(price_float)
1128 .bind(asset_start.to_bytes())
1129 .bind(asset_end.to_bytes())
1130 .bind(asset_hops)
1131 .bind(amount_hops)
1132 .bind(position_id_hops)
1133 .execute(dbtx.as_mut())
1134 .await?;
1135 }
1136
1137 Ok(())
1138 }
1139
1140 async fn record_block_summary(
1141 &self,
1142 dbtx: &mut PgTransaction<'_>,
1143 time: DateTime,
1144 height: i32,
1145 events: &Events,
1146 ) -> anyhow::Result<()> {
1147 let num_opened_lps = events.position_opens.len() as i32;
1148 let num_closed_lps = events.position_closes.len() as i32;
1149 let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1150 let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1151 let num_swap_claims = events
1152 .swap_claims
1153 .iter()
1154 .map(|(_, v)| v.len())
1155 .sum::<usize>() as i32;
1156 let num_txs = events.batch_swaps.len() as i32;
1157
1158 let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1159
1160 for event in &events.batch_swaps {
1161 let trading_pair = event.batch_swap_output_data.trading_pair;
1162
1163 if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1164 let asset_start = swap_1_2.input.asset_id;
1165 let asset_end = swap_1_2.output.asset_id;
1166 let input = swap_1_2.input.amount;
1167 let output = swap_1_2.output.amount;
1168 let price_float = (output.value() as f64) / (input.value() as f64);
1169
1170 let empty_vec = vec![];
1171 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1172 let filtered_swaps: Vec<_> = swaps_for_pair
1173 .iter()
1174 .filter(|swap| swap.delta_1_i != Amount::zero())
1175 .collect::<Vec<_>>();
1176 let num_swaps = filtered_swaps.len() as i32;
1177
1178 batch_swap_summaries.push(BatchSwapSummary {
1179 asset_start,
1180 asset_end,
1181 input,
1182 output,
1183 num_swaps,
1184 price_float,
1185 });
1186 }
1187
1188 if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1189 let asset_start = swap_2_1.input.asset_id;
1190 let asset_end = swap_2_1.output.asset_id;
1191 let input = swap_2_1.input.amount;
1192 let output = swap_2_1.output.amount;
1193 let price_float = (output.value() as f64) / (input.value() as f64);
1194
1195 let empty_vec = vec![];
1196 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1197 let filtered_swaps: Vec<_> = swaps_for_pair
1198 .iter()
1199 .filter(|swap| swap.delta_2_i != Amount::zero())
1200 .collect::<Vec<_>>();
1201 let num_swaps = filtered_swaps.len() as i32;
1202
1203 batch_swap_summaries.push(BatchSwapSummary {
1204 asset_start,
1205 asset_end,
1206 input,
1207 output,
1208 num_swaps,
1209 price_float,
1210 });
1211 }
1212 }
1213
1214 sqlx::query(
1215 "INSERT INTO dex_ex_block_summary (
1216 height,
1217 time,
1218 batch_swaps,
1219 num_open_lps,
1220 num_closed_lps,
1221 num_withdrawn_lps,
1222 num_swaps,
1223 num_swap_claims,
1224 num_txs
1225 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1226 )
1227 .bind(height)
1228 .bind(time)
1229 .bind(serde_json::to_value(&batch_swap_summaries)?)
1230 .bind(num_opened_lps)
1231 .bind(num_closed_lps)
1232 .bind(num_withdrawn_lps)
1233 .bind(num_swaps)
1234 .bind(num_swap_claims)
1235 .bind(num_txs)
1236 .execute(dbtx.as_mut())
1237 .await?;
1238
1239 Ok(())
1240 }
1241
1242 async fn record_batch_swap_traces(
1243 &self,
1244 dbtx: &mut PgTransaction<'_>,
1245 time: DateTime,
1246 height: i32,
1247 event: &EventBatchSwap,
1248 ) -> anyhow::Result<()> {
1249 let EventBatchSwap {
1250 batch_swap_output_data: _,
1251 swap_execution_1_for_2,
1252 swap_execution_2_for_1,
1253 } = event;
1254
1255 if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1256 self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1257 .await?;
1258 }
1259
1260 if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1261 self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1262 .await?;
1263 }
1264
1265 Ok(())
1266 }
1267
1268 async fn record_position_execution(
1269 &self,
1270 dbtx: &mut PgTransaction<'_>,
1271 time: DateTime,
1272 height: i32,
1273 event: &EventPositionExecution,
1274 positions: &BTreeMap<PositionId, Position>,
1275 ) -> anyhow::Result<()> {
1276 let position = positions
1278 .get(&event.position_id)
1279 .expect("position must exist for execution");
1280 let current = Reserves {
1281 r1: event.reserves_1,
1282 r2: event.reserves_2,
1283 };
1284 let prev = Reserves {
1285 r1: event.prev_reserves_1,
1286 r2: event.prev_reserves_2,
1287 };
1288 let flows = Flows::from_phi_and_reserves(&position.phi, ¤t, &prev);
1289
1290 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1292 "INSERT INTO dex_ex_position_reserves (
1293 position_id,
1294 height,
1295 time,
1296 reserves_1,
1297 reserves_2
1298 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1299 )
1300 .bind(event.position_id.0)
1301 .bind(height)
1302 .bind(time)
1303 .bind(BigDecimal::from(event.reserves_1.value()))
1304 .bind(BigDecimal::from(event.reserves_2.value()))
1305 .fetch_one(dbtx.as_mut())
1306 .await?;
1307
1308 sqlx::query(
1310 "INSERT INTO dex_ex_position_executions (
1311 position_id,
1312 height,
1313 time,
1314 reserves_rowid,
1315 delta_1,
1316 delta_2,
1317 lambda_1,
1318 lambda_2,
1319 fee_1,
1320 fee_2,
1321 context_asset_start,
1322 context_asset_end
1323 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1324 )
1325 .bind(event.position_id.0)
1326 .bind(height)
1327 .bind(time)
1328 .bind(reserves_rowid)
1329 .bind(BigDecimal::from(flows.delta_1().value()))
1330 .bind(BigDecimal::from(flows.delta_2().value()))
1331 .bind(BigDecimal::from(flows.lambda_1().value()))
1332 .bind(BigDecimal::from(flows.lambda_2().value()))
1333 .bind(BigDecimal::from(flows.fee_1().value()))
1334 .bind(BigDecimal::from(flows.fee_2().value()))
1335 .bind(event.context.start.to_bytes())
1336 .bind(event.context.end.to_bytes())
1337 .execute(dbtx.as_mut())
1338 .await?;
1339
1340 Ok(())
1341 }
1342
1343 async fn record_position_close(
1344 &self,
1345 dbtx: &mut PgTransaction<'_>,
1346 time: DateTime,
1347 height: i32,
1348 tx_hash: Option<[u8; 32]>,
1349 event: &EventPositionClose,
1350 ) -> anyhow::Result<()> {
1351 sqlx::query(
1352 "UPDATE dex_ex_position_state
1353 SET closing_time = $1,
1354 closing_height = $2,
1355 closing_tx = $3
1356 WHERE position_id = $4",
1357 )
1358 .bind(time)
1359 .bind(height)
1360 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1361 .bind(event.position_id.0)
1362 .execute(dbtx.as_mut())
1363 .await?;
1364
1365 Ok(())
1366 }
1367
1368 async fn record_position_withdraw(
1369 &self,
1370 dbtx: &mut PgTransaction<'_>,
1371 time: DateTime,
1372 height: i32,
1373 tx_hash: Option<[u8; 32]>,
1374 event: &EventPositionWithdraw,
1375 ) -> anyhow::Result<()> {
1376 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1378 "INSERT INTO dex_ex_position_reserves (
1379 position_id,
1380 height,
1381 time,
1382 reserves_1,
1383 reserves_2
1384 ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", )
1386 .bind(event.position_id.0)
1387 .bind(height)
1388 .bind(time)
1389 .bind(BigDecimal::from(0)) .fetch_one(dbtx.as_mut())
1391 .await?;
1392
1393 sqlx::query(
1394 "INSERT INTO dex_ex_position_withdrawals (
1395 position_id,
1396 height,
1397 time,
1398 withdrawal_tx,
1399 sequence,
1400 reserves_1,
1401 reserves_2,
1402 reserves_rowid
1403 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1404 )
1405 .bind(event.position_id.0)
1406 .bind(height)
1407 .bind(time)
1408 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1409 .bind(event.sequence as i32)
1410 .bind(BigDecimal::from(event.reserves_1.value()))
1411 .bind(BigDecimal::from(event.reserves_2.value()))
1412 .bind(reserves_rowid)
1413 .execute(dbtx.as_mut())
1414 .await?;
1415
1416 Ok(())
1417 }
1418
1419 async fn record_transaction(
1420 &self,
1421 dbtx: &mut PgTransaction<'_>,
1422 time: DateTime,
1423 height: u64,
1424 transaction_id: [u8; 32],
1425 transaction: Transaction,
1426 ) -> anyhow::Result<()> {
1427 if transaction.transaction_body.actions.is_empty() {
1428 return Ok(());
1429 }
1430 sqlx::query(
1431 "INSERT INTO dex_ex_transactions (
1432 transaction_id,
1433 transaction,
1434 height,
1435 time
1436 ) VALUES ($1, $2, $3, $4)
1437 ",
1438 )
1439 .bind(transaction_id)
1440 .bind(transaction.encode_to_vec())
1441 .bind(i32::try_from(height)?)
1442 .bind(time)
1443 .execute(dbtx.as_mut())
1444 .await?;
1445
1446 Ok(())
1447 }
1448
1449 async fn record_all_transactions(
1450 &self,
1451 dbtx: &mut PgTransaction<'_>,
1452 time: DateTime,
1453 block: &BlockEvents,
1454 ) -> anyhow::Result<()> {
1455 for (tx_id, tx_bytes) in block.transactions() {
1456 let tx = Transaction::try_from(tx_bytes)?;
1457 let height = block.height();
1458 self.record_transaction(dbtx, time, height, tx_id, tx)
1459 .await?;
1460 }
1461 Ok(())
1462 }
1463}
1464
1465#[async_trait]
1466impl AppView for Component {
1467 async fn init_chain(
1468 &self,
1469 _dbtx: &mut PgTransaction,
1470 _: &serde_json::Value,
1471 ) -> Result<(), anyhow::Error> {
1472 Ok(())
1473 }
1474
1475 fn name(&self) -> String {
1476 "dex_ex".to_string()
1477 }
1478
1479 fn version(&self) -> Version {
1480 Version::with_major(1)
1481 }
1482
1483 async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1484 for statement in include_str!("reset.sql").split(";") {
1485 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1486 }
1487 Ok(())
1488 }
1489
1490 async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1491 for statement in include_str!("schema.sql").split(";") {
1492 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1493 }
1494 Ok(())
1495 }
1496
1497 async fn index_batch(
1498 &self,
1499 dbtx: &mut PgTransaction,
1500 batch: EventBatch,
1501 ctx: EventBatchContext,
1502 ) -> Result<(), anyhow::Error> {
1503 metadata::set(dbtx, self.denom).await?;
1504 let mut charts = HashMap::new();
1505 let mut snapshots = HashMap::new();
1506 let mut last_time = None;
1507 for block in batch.events_by_block() {
1508 let mut events = Events::extract(&block)?;
1509 let time = events
1510 .time
1511 .expect(&format!("no block root event at height {}", block.height()));
1512 last_time = Some(time);
1513
1514 self.record_all_transactions(dbtx, time, block).await?;
1515
1516 events.load_positions(dbtx).await?;
1518
1519 self.record_block_summary(dbtx, time, block.height() as i32, &events)
1521 .await?;
1522
1523 for event in &events.batch_swaps {
1525 self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1526 .await?;
1527 }
1528
1529 for event in &events.position_opens {
1531 let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1532 self.record_position_open(dbtx, time, events.height, tx_hash, event)
1533 .await?;
1534 }
1535
1536 for event in &events.position_executions {
1538 self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1539 .await?;
1540 }
1541
1542 for event in &events.position_closes {
1544 let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1545 self.record_position_close(dbtx, time, events.height, tx_hash, event)
1546 .await?;
1547 }
1548
1549 for event in &events.position_withdrawals {
1551 let tx_hash = events
1552 .position_withdrawal_txs
1553 .get(&event.position_id)
1554 .copied();
1555 self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1556 .await?;
1557 }
1558
1559 for (pair, candle) in &events.candles {
1560 sqlx::query(
1561 "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1562 )
1563 .bind(i64::try_from(block.height())?)
1564 .bind(time)
1565 .bind(pair.start.to_bytes())
1566 .bind(pair.end.to_bytes())
1567 .bind(candle.open)
1568 .bind(candle.close)
1569 .bind(candle.low)
1570 .bind(candle.high)
1571 .bind(candle.direct_volume)
1572 .bind(candle.swap_volume)
1573 .execute(dbtx.as_mut())
1574 .await?;
1575 for window in Window::all() {
1576 let key = (pair.start, pair.end, window);
1577 if !charts.contains_key(&key) {
1578 let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1579 charts.insert(key, ctx);
1580 }
1581 charts
1582 .get_mut(&key)
1583 .unwrap() .update(dbtx, time, *candle)
1585 .await?;
1586 }
1587 }
1588
1589 let block_pairs = events
1590 .candles
1591 .keys()
1592 .chain(events.metrics.keys())
1593 .copied()
1594 .collect::<HashSet<_>>();
1595 for pair in block_pairs {
1596 if !snapshots.contains_key(&pair) {
1597 let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1598 snapshots.insert(pair, ctx);
1599 }
1600 snapshots
1602 .get_mut(&pair)
1603 .unwrap()
1604 .update(
1605 dbtx,
1606 time,
1607 events.candles.get(&pair).copied(),
1608 events.metrics.get(&pair).copied().unwrap_or_default(),
1609 events.price_for(self.denom, pair.start),
1610 )
1611 .await?;
1612 }
1613 }
1614
1615 if ctx.is_last() {
1616 if let Some(now) = last_time {
1617 for window in Window::all() {
1618 summary::update_summaries(dbtx, now, window).await?;
1619 summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1620 .await?;
1621 }
1622 }
1623 }
1624 for chart in charts.into_values() {
1625 chart.unload(dbtx).await?;
1626 }
1627 Ok(())
1628 }
1629}