1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4 async_trait,
5 index::{BlockEvents, EventBatch, EventBatchContext, Version},
6 AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10 event::{
11 EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12 EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13 },
14 lp::{position::Flows, Reserves},
15 DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18 event::{EventSwap, EventSwapClaim},
19 lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34 use super::DateTime;
35 use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36 use penumbra_sdk_dex::CandlestickData;
37 use std::fmt::Display;
38
39 #[derive(Debug, Clone, Copy)]
44 pub struct Candle {
45 pub open: f64,
46 pub close: f64,
47 pub low: f64,
48 pub high: f64,
49 pub direct_volume: f64,
50 pub swap_volume: f64,
51 }
52
53 impl Candle {
54 pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55 Self {
59 open: data.open,
60 close: data.close,
61 low: data.low,
62 high: data.high,
63 direct_volume: data.direct_volume,
64 swap_volume: data.swap_volume,
65 }
66 }
67
68 pub fn point(price: f64, direct_volume: f64) -> Self {
69 Self {
70 open: price,
71 close: price,
72 low: price,
73 high: price,
74 direct_volume,
75 swap_volume: 0.0,
76 }
77 }
78
79 pub fn merge(&mut self, that: &Self) {
80 self.close = that.close;
81 self.low = self.low.min(that.low);
82 self.high = self.high.max(that.high);
83 self.direct_volume += that.direct_volume;
84 self.swap_volume += that.swap_volume;
85 }
86 }
87
88 #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89 pub enum Window {
90 W1m,
91 W15m,
92 W1h,
93 W4h,
94 W1d,
95 W1w,
96 W1mo,
97 }
98
99 impl Window {
100 pub fn all() -> impl Iterator<Item = Self> {
101 [
102 Window::W1m,
103 Window::W15m,
104 Window::W1h,
105 Window::W4h,
106 Window::W1d,
107 Window::W1w,
108 Window::W1mo,
109 ]
110 .into_iter()
111 }
112
113 pub fn anchor(&self, time: DateTime) -> DateTime {
120 let (y, mo, d, h, m) = (
121 time.year(),
122 time.month(),
123 time.day(),
124 time.hour(),
125 time.minute(),
126 );
127 let out = match self {
128 Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129 Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130 Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131 Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132 Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133 Window::W1w => Utc
134 .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135 .single()
136 .and_then(|x| {
137 x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138 }),
139 Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140 };
141 out.unwrap()
142 }
143
144 pub fn subtract_from(&self, time: DateTime) -> DateTime {
145 let delta = match self {
146 Window::W1m => TimeDelta::minutes(1),
147 Window::W15m => TimeDelta::minutes(15),
148 Window::W1h => TimeDelta::hours(1),
149 Window::W4h => TimeDelta::hours(4),
150 Window::W1d => TimeDelta::days(1),
151 Window::W1w => TimeDelta::weeks(1),
152 Window::W1mo => TimeDelta::days(30),
153 };
154 time - delta
155 }
156 }
157
158 impl Display for Window {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 use Window::*;
161 let str = match self {
162 W1m => "1m",
163 W15m => "15m",
164 W1h => "1h",
165 W4h => "4h",
166 W1d => "1d",
167 W1w => "1w",
168 W1mo => "1mo",
169 };
170 write!(f, "{}", str)
171 }
172 }
173
174 #[derive(Debug)]
175 pub struct WindowedCandle {
176 start: DateTime,
177 window: Window,
178 candle: Candle,
179 }
180
181 impl WindowedCandle {
182 pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183 Self {
184 start: window.anchor(now),
185 window,
186 candle,
187 }
188 }
189
190 pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195 let start = self.window.anchor(now);
196 if start > self.start {
198 let old_start = std::mem::replace(&mut self.start, start);
199 let old_candle = std::mem::replace(&mut self.candle, candle);
200 Some((old_start, old_candle))
201 } else {
202 self.candle.merge(&candle);
203 None
204 }
205 }
206
207 pub fn window(&self) -> Window {
208 self.window
209 }
210
211 pub fn flush(self) -> (DateTime, Candle) {
212 (self.start, self.candle)
213 }
214 }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219 use super::*;
220
221 #[derive(Debug)]
223 pub struct Context {
224 asset_start: asset::Id,
225 asset_end: asset::Id,
226 window: Window,
227 state: Option<WindowedCandle>,
228 }
229
230 impl Context {
231 pub async fn load(
232 dbtx: &mut PgTransaction<'_>,
233 asset_start: asset::Id,
234 asset_end: asset::Id,
235 window: Window,
236 ) -> anyhow::Result<Self> {
237 let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238 "
239 SELECT open, close, high, low, direct_volume, swap_volume, start_time
240 FROM dex_ex_price_charts
241 WHERE asset_start = $1
242 AND asset_end = $2
243 AND the_window = $3
244 ORDER BY start_time DESC
245 LIMIT 1
246 ",
247 )
248 .bind(asset_start.to_bytes())
249 .bind(asset_end.to_bytes())
250 .bind(window.to_string())
251 .fetch_optional(dbtx.as_mut())
252 .await?;
253 let state = row.map(
254 |(open, close, high, low, direct_volume, swap_volume, start)| {
255 let candle = Candle {
256 open,
257 close,
258 low,
259 high,
260 direct_volume,
261 swap_volume,
262 };
263 WindowedCandle::new(start, window, candle)
264 },
265 );
266 Ok(Self {
267 asset_start,
268 asset_end,
269 window,
270 state,
271 })
272 }
273
274 async fn write_candle(
275 &self,
276 dbtx: &mut PgTransaction<'_>,
277 start: DateTime,
278 candle: Candle,
279 ) -> anyhow::Result<()> {
280 sqlx::query(
281 "
282 INSERT INTO dex_ex_price_charts(
283 id, asset_start, asset_end, the_window,
284 start_time, open, close, high, low, direct_volume, swap_volume
285 )
286 VALUES(
287 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288 )
289 ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290 open = EXCLUDED.open,
291 close = EXCLUDED.close,
292 high = EXCLUDED.high,
293 low = EXCLUDED.low,
294 direct_volume = EXCLUDED.direct_volume,
295 swap_volume = EXCLUDED.swap_volume
296 ",
297 )
298 .bind(self.asset_start.to_bytes())
299 .bind(self.asset_end.to_bytes())
300 .bind(self.window.to_string())
301 .bind(start)
302 .bind(candle.open)
303 .bind(candle.close)
304 .bind(candle.high)
305 .bind(candle.low)
306 .bind(candle.direct_volume)
307 .bind(candle.swap_volume)
308 .execute(dbtx.as_mut())
309 .await?;
310 Ok(())
311 }
312
313 pub async fn update(
314 &mut self,
315 dbtx: &mut PgTransaction<'_>,
316 now: DateTime,
317 candle: Candle,
318 ) -> anyhow::Result<()> {
319 let state = match self.state.as_mut() {
320 Some(x) => x,
321 None => {
322 self.state = Some(WindowedCandle::new(now, self.window, candle));
323 self.state.as_mut().unwrap()
324 }
325 };
326 if let Some((start, old_candle)) = state.with_candle(now, candle) {
327 self.write_candle(dbtx, start, old_candle).await?;
328 };
329 Ok(())
330 }
331
332 pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333 let state = std::mem::replace(&mut self.state, None);
334 if let Some(state) = state {
335 let (start, candle) = state.flush();
336 self.write_candle(dbtx, start, candle).await?;
337 }
338 Ok(())
339 }
340 }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346 use cometindex::PgTransaction;
347 use penumbra_sdk_asset::asset;
348
349 use super::{Candle, DateTime, PairMetrics, Window};
350
351 pub struct Context {
352 start: asset::Id,
353 end: asset::Id,
354 price: f64,
355 liquidity: f64,
356 start_price_indexing_denom: f64,
357 }
358
359 impl Context {
360 pub async fn load(
361 dbtx: &mut PgTransaction<'_>,
362 start: asset::Id,
363 end: asset::Id,
364 ) -> anyhow::Result<Self> {
365 let row: Option<(f64, f64, f64)> = sqlx::query_as(
366 "
367 SELECT price, liquidity, start_price_indexing_denom
368 FROM dex_ex_pairs_block_snapshot
369 WHERE asset_start = $1
370 AND asset_end = $2
371 ORDER BY id DESC
372 LIMIT 1
373 ",
374 )
375 .bind(start.to_bytes())
376 .bind(end.to_bytes())
377 .fetch_optional(dbtx.as_mut())
378 .await?;
379 let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380 Ok(Self {
381 start,
382 end,
383 price,
384 liquidity,
385 start_price_indexing_denom,
386 })
387 }
388
389 pub async fn update(
390 &mut self,
391 dbtx: &mut PgTransaction<'_>,
392 now: DateTime,
393 candle: Option<Candle>,
394 metrics: PairMetrics,
395 start_price_indexing_denom: Option<f64>,
396 ) -> anyhow::Result<()> {
397 if let Some(candle) = candle {
398 self.price = candle.close;
399 }
400 if let Some(price) = start_price_indexing_denom {
401 self.start_price_indexing_denom = price;
402 }
403 self.liquidity += metrics.liquidity_change;
404
405 sqlx::query(
406 "
407 INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408 DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409 )
410 ",
411 )
412 .bind(now)
413 .bind(self.start.to_bytes())
414 .bind(self.end.to_bytes())
415 .bind(self.price)
416 .bind(self.liquidity)
417 .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418 .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419 .bind(self.start_price_indexing_denom)
420 .bind(metrics.trades)
421 .execute(dbtx.as_mut())
422 .await?;
423 Ok(())
424 }
425 }
426
427 pub async fn update_summaries(
428 dbtx: &mut PgTransaction<'_>,
429 now: DateTime,
430 window: Window,
431 ) -> anyhow::Result<()> {
432 let then = window.subtract_from(now);
433 let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434 "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435 )
436 .fetch_all(dbtx.as_mut())
437 .await?;
438 for (start, end) in pairs {
439 sqlx::query(
440 "
441 WITH
442 snapshots AS (
443 SELECT *
444 FROM dex_ex_pairs_block_snapshot
445 WHERE asset_start = $1
446 AND asset_end = $2
447 ),
448 previous AS (
449 SELECT price AS price_then, liquidity AS liquidity_then
450 FROM snapshots
451 WHERE time <= $4
452 ORDER BY time DESC
453 LIMIT 1
454 ),
455 previous_or_default AS (
456 SELECT
457 COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458 COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459 ),
460 now AS (
461 SELECT price, liquidity
462 FROM snapshots
463 WHERE time <= $3
464 ORDER BY time DESC
465 LIMIT 1
466 ),
467 sums AS (
468 SELECT
469 COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470 COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472 COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473 COALESCE(SUM(trades), 0.0) AS trades_over_window,
474 COALESCE(MIN(price), 0.0) AS low,
475 COALESCE(MAX(price), 0.0) AS high
476 FROM snapshots
477 WHERE time <= $3
478 AND time >= $4
479 )
480 INSERT INTO dex_ex_pairs_summary
481 SELECT
482 $1, $2, $5,
483 price, price_then,
484 low, high,
485 liquidity, liquidity_then,
486 direct_volume_over_window,
487 swap_volume_over_window,
488 direct_volume_indexing_denom_over_window,
489 swap_volume_indexing_denom_over_window,
490 trades_over_window
491 FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492 ON CONFLICT (asset_start, asset_end, the_window)
493 DO UPDATE SET
494 price = EXCLUDED.price,
495 price_then = EXCLUDED.price_then,
496 liquidity = EXCLUDED.liquidity,
497 liquidity_then = EXCLUDED.liquidity_then,
498 direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499 swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500 direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501 swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502 trades_over_window = EXCLUDED.trades_over_window
503 ",
504 )
505 .bind(start)
506 .bind(end)
507 .bind(now)
508 .bind(then)
509 .bind(window.to_string())
510 .execute(dbtx.as_mut())
511 .await?;
512 }
513 Ok(())
514 }
515
516 pub async fn update_aggregate_summary(
517 dbtx: &mut PgTransaction<'_>,
518 window: Window,
519 denom: asset::Id,
520 min_liquidity: f64,
521 ) -> anyhow::Result<()> {
522 sqlx::query(
524 "
525 WITH
526 eligible_denoms AS (
527 SELECT asset_start as asset, price
528 FROM dex_ex_pairs_summary
529 WHERE asset_end = $1
530 AND liquidity >= $2
531 UNION VALUES ($1, 1.0)
532 ),
533 converted_pairs_summary AS (
534 SELECT
535 asset_start, asset_end,
536 (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537 liquidity * ed_end.price AS liquidity,
538 direct_volume_indexing_denom_over_window AS dv,
539 swap_volume_indexing_denom_over_window AS sv,
540 trades_over_window as trades
541 FROM dex_ex_pairs_summary
542 JOIN eligible_denoms AS ed_end
543 ON ed_end.asset = asset_end
544 JOIN eligible_denoms AS ed_start
545 ON ed_start.asset = asset_start
546 WHERE the_window = $3
547 ),
548 sums AS (
549 SELECT
550 SUM(dv) AS direct_volume,
551 SUM(sv) AS swap_volume,
552 SUM(trades) AS trades
553 FROM converted_pairs_summary WHERE asset_start < asset_end
554 ),
555 total_liquidity AS (
556 SELECT
557 SUM(liquidity) AS liquidity
558 FROM converted_pairs_summary
559 ),
560 undirected_pairs_summary AS (
561 WITH pairs AS (
562 SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563 )
564 SELECT
565 a_start AS asset_start,
566 a_end AS asset_end,
567 (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568 (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569 FROM pairs
570 ),
571 counts AS (
572 SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573 ),
574 largest_sv AS (
575 SELECT
576 asset_start AS largest_sv_trading_pair_start,
577 asset_end AS largest_sv_trading_pair_end,
578 sv AS largest_sv_trading_pair_volume
579 FROM undirected_pairs_summary
580 ORDER BY sv DESC
581 LIMIT 1
582 ),
583 largest_dv AS (
584 SELECT
585 asset_start AS largest_dv_trading_pair_start,
586 asset_end AS largest_dv_trading_pair_end,
587 dv AS largest_dv_trading_pair_volume
588 FROM undirected_pairs_summary
589 ORDER BY dv DESC
590 LIMIT 1
591 ),
592 top_price_mover AS (
593 SELECT
594 asset_start AS top_price_mover_start,
595 asset_end AS top_price_mover_end,
596 price_change AS top_price_mover_change_percent
597 FROM converted_pairs_summary
598 ORDER BY price_change DESC
599 LIMIT 1
600 )
601 INSERT INTO dex_ex_aggregate_summary
602 SELECT
603 $3,
604 direct_volume, swap_volume, liquidity, trades, active_pairs,
605 largest_sv_trading_pair_start,
606 largest_sv_trading_pair_end,
607 largest_sv_trading_pair_volume,
608 largest_dv_trading_pair_start,
609 largest_dv_trading_pair_end,
610 largest_dv_trading_pair_volume,
611 top_price_mover_start,
612 top_price_mover_end,
613 top_price_mover_change_percent
614 FROM
615 sums
616 JOIN largest_sv ON TRUE
617 JOIN largest_dv ON TRUE
618 JOIN top_price_mover ON TRUE
619 JOIN total_liquidity ON TRUE
620 JOIN counts ON TRUE
621 ON CONFLICT (the_window) DO UPDATE SET
622 direct_volume = EXCLUDED.direct_volume,
623 swap_volume = EXCLUDED.swap_volume,
624 liquidity = EXCLUDED.liquidity,
625 trades = EXCLUDED.trades,
626 active_pairs = EXCLUDED.active_pairs,
627 largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628 largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629 largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630 largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631 largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632 largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633 top_price_mover_start = EXCLUDED.top_price_mover_start,
634 top_price_mover_end = EXCLUDED.top_price_mover_end,
635 top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636 ")
637 .bind(denom.to_bytes())
638 .bind(min_liquidity)
639 .bind(window.to_string())
640 .execute(dbtx.as_mut())
641 .await?;
642 Ok(())
643 }
644}
645
646mod metadata {
647 use super::*;
648
649 pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650 sqlx::query(
651 "
652 INSERT INTO dex_ex_metadata
653 VALUES (1, $1)
654 ON CONFLICT (id) DO UPDATE
655 SET id = EXCLUDED.id,
656 quote_asset_id = EXCLUDED.quote_asset_id
657 ",
658 )
659 .bind(quote_asset.to_bytes())
660 .execute(dbtx.as_mut())
661 .await?;
662 Ok(())
663 }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668 trades: f64,
669 liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674 asset_start: asset::Id,
675 asset_end: asset::Id,
676 input: Amount,
677 output: Amount,
678 num_swaps: i32,
679 price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684 time: Option<DateTime>,
685 height: i32,
686 candles: HashMap<DirectedTradingPair, Candle>,
687 metrics: HashMap<DirectedTradingPair, PairMetrics>,
688 positions: BTreeMap<PositionId, Position>,
690 position_opens: Vec<EventPositionOpen>,
692 position_executions: Vec<EventPositionExecution>,
693 position_closes: Vec<EventPositionClose>,
694 position_withdrawals: Vec<EventPositionWithdraw>,
695 batch_swaps: Vec<EventBatchSwap>,
696 swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697 swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698 position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700 position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701 position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705 fn new() -> Self {
706 Self {
707 time: None,
708 height: 0,
709 candles: HashMap::new(),
710 metrics: HashMap::new(),
711 positions: BTreeMap::new(),
712 position_opens: Vec::new(),
713 position_executions: Vec::new(),
714 position_closes: Vec::new(),
715 position_withdrawals: Vec::new(),
716 batch_swaps: Vec::new(),
717 swaps: BTreeMap::new(),
718 swap_claims: BTreeMap::new(),
719 position_open_txs: BTreeMap::new(),
720 position_close_txs: BTreeMap::new(),
721 position_withdrawal_txs: BTreeMap::new(),
722 }
723 }
724
725 fn with_time(&mut self, time: DateTime) {
726 self.time = Some(time)
727 }
728
729 fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730 if !self.metrics.contains_key(pair) {
731 self.metrics.insert(*pair, PairMetrics::default());
732 }
733 self.metrics.get_mut(pair).unwrap()
735 }
736
737 fn with_trace(&mut self, input: Value, output: Value) {
738 let pair_1_2 = DirectedTradingPair {
739 start: input.asset_id,
740 end: output.asset_id,
741 };
742 if pair_1_2.start == pair_1_2.end {
744 tracing::warn!(?input, ?output, "found trace with a loop?");
745 return;
746 }
747 let pair_2_1 = pair_1_2.flip();
748 let input_amount = f64::from(input.amount);
749 let output_amount = f64::from(output.amount);
750 let price_1_2 = output_amount / input_amount;
751 let candle_1_2 = Candle::point(price_1_2, input_amount);
752 let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
753 self.metric(&pair_1_2).trades += 1.0;
754 self.candles
755 .entry(pair_1_2)
756 .and_modify(|c| c.merge(&candle_1_2))
757 .or_insert(candle_1_2);
758 self.metric(&pair_2_1).trades += 1.0;
759 self.candles
760 .entry(pair_2_1)
761 .and_modify(|c| c.merge(&candle_2_1))
762 .or_insert(candle_2_1);
763 }
764
765 fn with_swap_execution(&mut self, se: &SwapExecution) {
766 for row in se.traces.iter() {
767 for window in row.windows(2) {
768 self.with_trace(window[0], window[1]);
769 }
770 }
771 let pair_1_2 = DirectedTradingPair {
772 start: se.input.asset_id,
773 end: se.output.asset_id,
774 };
775 if pair_1_2.start == pair_1_2.end {
778 return;
779 }
780 let pair_2_1 = pair_1_2.flip();
781 self.candles
782 .entry(pair_1_2)
783 .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
784 self.candles
785 .entry(pair_2_1)
786 .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
787 }
788
789 fn with_reserve_change(
790 &mut self,
791 pair: &TradingPair,
792 old_reserves: Option<Reserves>,
793 new_reserves: Reserves,
794 removed: bool,
795 ) {
796 let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
797 (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
798 (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
799 (_, Some(old), new) => (
800 (new.r1.value() as f64) - (old.r1.value() as f64),
801 (new.r2.value() as f64) - (old.r2.value() as f64),
802 ),
803 };
804 for (d_pair, diff) in [
805 (
806 DirectedTradingPair {
807 start: pair.asset_1(),
808 end: pair.asset_2(),
809 },
810 diff_2,
811 ),
812 (
813 DirectedTradingPair {
814 start: pair.asset_2(),
815 end: pair.asset_1(),
816 },
817 diff_1,
818 ),
819 ] {
820 self.metric(&d_pair).liquidity_change += diff;
821 }
822 }
823
824 pub fn extract(block: &BlockEvents, ignore_arb_executions: bool) -> anyhow::Result<Self> {
825 let mut out = Self::new();
826 out.height = block.height() as i32;
827
828 for event in block.events() {
829 if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
830 let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
831 "creating timestamp should succeed; timestamp: {}",
832 e.timestamp_seconds
833 ))?;
834 out.with_time(time);
835 } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
836 out.with_reserve_change(
837 &e.trading_pair,
838 None,
839 Reserves {
840 r1: e.reserves_1,
841 r2: e.reserves_2,
842 },
843 false,
844 );
845 if let Some(tx_hash) = event.tx_hash() {
846 out.position_open_txs.insert(e.position_id, tx_hash);
847 }
848 out.positions.insert(e.position_id, e.position.clone());
852 out.position_opens.push(e);
853 } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
854 out.with_reserve_change(
857 &e.trading_pair,
858 None,
859 Reserves {
860 r1: e.reserves_1,
861 r2: e.reserves_2,
862 },
863 true,
864 );
865 if let Some(tx_hash) = event.tx_hash() {
866 out.position_withdrawal_txs.insert(e.position_id, tx_hash);
867 }
868 out.position_withdrawals.push(e);
869 } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
870 out.with_reserve_change(
871 &e.trading_pair,
872 Some(Reserves {
873 r1: e.prev_reserves_1,
874 r2: e.prev_reserves_2,
875 }),
876 Reserves {
877 r1: e.reserves_1,
878 r2: e.reserves_2,
879 },
880 false,
881 );
882 out.position_executions.push(e);
883 } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
884 out.position_closes.push(e);
885 } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
886 if let Some(tx_hash) = event.tx_hash() {
889 out.position_close_txs.insert(e.position_id, tx_hash);
890 }
891 } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
892 out.swaps
893 .entry(e.trading_pair)
894 .or_insert_with(Vec::new)
895 .push(e);
896 } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
897 out.swap_claims
898 .entry(e.trading_pair)
899 .or_insert_with(Vec::new)
900 .push(e);
901 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
902 let pair = DirectedTradingPair {
903 start: e.incentivized_asset_id,
904 end: *STAKING_TOKEN_ASSET_ID,
905 };
906 out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
907 } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
908 if let Some(se) = e.swap_execution_1_for_2.as_ref() {
910 out.with_swap_execution(se);
911 }
912 if let Some(se) = e.swap_execution_2_for_1.as_ref() {
913 out.with_swap_execution(se);
914 }
915 out.batch_swaps.push(e);
916 } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
917 if !ignore_arb_executions {
918 out.with_swap_execution(&e.swap_execution);
919 }
920 }
921 }
922 Ok(out)
923 }
924
925 async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
926 let missing_positions: Vec<_> = self
928 .position_executions
929 .iter()
930 .map(|e| e.position_id)
931 .filter(|id| !self.positions.contains_key(id))
932 .collect();
933
934 if missing_positions.is_empty() {
935 return Ok(());
936 }
937
938 let rows = sqlx::query(
940 "SELECT position_raw
941 FROM dex_ex_position_state
942 WHERE position_id = ANY($1)",
943 )
944 .bind(
945 &missing_positions
946 .iter()
947 .map(|id| id.0.as_ref())
948 .collect::<Vec<_>>(),
949 )
950 .fetch_all(dbtx.as_mut())
951 .await?;
952
953 for row in rows {
955 let position_raw: Vec<u8> = row.get("position_raw");
956 let position = Position::decode(position_raw.as_slice())?;
957 self.positions.insert(position.id(), position);
958 }
959
960 Ok(())
961 }
962
963 pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
965 self.candles
966 .get(&DirectedTradingPair::new(asset, indexing_denom))
967 .map(|x| x.close)
968 }
969}
970
971#[derive(Debug)]
972pub struct Component {
973 denom: asset::Id,
974 min_liquidity: f64,
975 ignore_arb_executions: bool,
976}
977
978impl Component {
979 pub fn new(denom: asset::Id, min_liquidity: f64, ignore_arb_executions: bool) -> Self {
980 Self {
981 denom,
982 min_liquidity,
983 ignore_arb_executions,
984 }
985 }
986
987 async fn record_position_open(
988 &self,
989 dbtx: &mut PgTransaction<'_>,
990 time: DateTime,
991 height: i32,
992 tx_hash: Option<[u8; 32]>,
993 event: &EventPositionOpen,
994 ) -> anyhow::Result<()> {
995 let effective_price_1_to_2: f64 = event
997 .position
998 .phi
999 .orient_start(event.trading_pair.asset_1())
1000 .expect("position trading pair matches")
1001 .effective_price()
1002 .into();
1003
1004 let effective_price_2_to_1: f64 = event
1005 .position
1006 .phi
1007 .orient_start(event.trading_pair.asset_2())
1008 .expect("position trading pair matches")
1009 .effective_price()
1010 .into();
1011
1012 let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1014 "INSERT INTO dex_ex_position_reserves (
1015 position_id,
1016 height,
1017 time,
1018 reserves_1,
1019 reserves_2
1020 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1021 )
1022 .bind(event.position_id.0)
1023 .bind(height)
1024 .bind(time)
1025 .bind(BigDecimal::from(event.reserves_1.value()))
1026 .bind(BigDecimal::from(event.reserves_2.value()))
1027 .fetch_one(dbtx.as_mut())
1028 .await?;
1029
1030 sqlx::query(
1032 "INSERT INTO dex_ex_position_state (
1033 position_id,
1034 asset_1,
1035 asset_2,
1036 p,
1037 q,
1038 close_on_fill,
1039 fee_bps,
1040 effective_price_1_to_2,
1041 effective_price_2_to_1,
1042 position_raw,
1043 opening_time,
1044 opening_height,
1045 opening_tx,
1046 opening_reserves_rowid
1047 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1048 )
1049 .bind(event.position_id.0)
1050 .bind(event.trading_pair.asset_1().to_bytes())
1051 .bind(event.trading_pair.asset_2().to_bytes())
1052 .bind(BigDecimal::from(event.position.phi.component.p.value()))
1053 .bind(BigDecimal::from(event.position.phi.component.q.value()))
1054 .bind(event.position.close_on_fill)
1055 .bind(event.trading_fee as i32)
1056 .bind(effective_price_1_to_2)
1057 .bind(effective_price_2_to_1)
1058 .bind(event.position.encode_to_vec())
1059 .bind(time)
1060 .bind(height)
1061 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1062 .bind(opening_reserves_rowid)
1063 .execute(dbtx.as_mut())
1064 .await?;
1065
1066 Ok(())
1067 }
1068
1069 async fn record_swap_execution_traces(
1070 &self,
1071 dbtx: &mut PgTransaction<'_>,
1072 time: DateTime,
1073 height: i32,
1074 swap_execution: &SwapExecution,
1075 ) -> anyhow::Result<()> {
1076 let SwapExecution {
1077 traces,
1078 input: se_input,
1079 output: se_output,
1080 } = swap_execution;
1081
1082 let asset_start = se_input.asset_id;
1083 let asset_end = se_output.asset_id;
1084 let batch_input = se_input.amount;
1085 let batch_output = se_output.amount;
1086
1087 for trace in traces.iter() {
1088 let Some(input_value) = trace.first() else {
1089 continue;
1090 };
1091 let Some(output_value) = trace.last() else {
1092 continue;
1093 };
1094
1095 let input = input_value.amount;
1096 let output = output_value.amount;
1097
1098 let price_float = (output.value() as f64) / (input.value() as f64);
1099 let amount_hops = trace
1100 .iter()
1101 .map(|x| BigDecimal::from(x.amount.value()))
1102 .collect::<Vec<_>>();
1103 let position_id_hops: Vec<[u8; 32]> = vec![];
1104 let asset_hops = trace
1105 .iter()
1106 .map(|x| x.asset_id.to_bytes())
1107 .collect::<Vec<_>>();
1108
1109 sqlx::query(
1110 "INSERT INTO dex_ex_batch_swap_traces (
1111 height,
1112 time,
1113 input,
1114 output,
1115 batch_input,
1116 batch_output,
1117 price_float,
1118 asset_start,
1119 asset_end,
1120 asset_hops,
1121 amount_hops,
1122 position_id_hops
1123 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1124 )
1125 .bind(height)
1126 .bind(time)
1127 .bind(BigDecimal::from(input.value()))
1128 .bind(BigDecimal::from(output.value()))
1129 .bind(BigDecimal::from(batch_input.value()))
1130 .bind(BigDecimal::from(batch_output.value()))
1131 .bind(price_float)
1132 .bind(asset_start.to_bytes())
1133 .bind(asset_end.to_bytes())
1134 .bind(asset_hops)
1135 .bind(amount_hops)
1136 .bind(position_id_hops)
1137 .execute(dbtx.as_mut())
1138 .await?;
1139 }
1140
1141 Ok(())
1142 }
1143
1144 async fn record_block_summary(
1145 &self,
1146 dbtx: &mut PgTransaction<'_>,
1147 time: DateTime,
1148 height: i32,
1149 events: &Events,
1150 ) -> anyhow::Result<()> {
1151 let num_opened_lps = events.position_opens.len() as i32;
1152 let num_closed_lps = events.position_closes.len() as i32;
1153 let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1154 let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1155 let num_swap_claims = events
1156 .swap_claims
1157 .iter()
1158 .map(|(_, v)| v.len())
1159 .sum::<usize>() as i32;
1160 let num_txs = events.batch_swaps.len() as i32;
1161
1162 let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1163
1164 for event in &events.batch_swaps {
1165 let trading_pair = event.batch_swap_output_data.trading_pair;
1166
1167 if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1168 let asset_start = swap_1_2.input.asset_id;
1169 let asset_end = swap_1_2.output.asset_id;
1170 let input = swap_1_2.input.amount;
1171 let output = swap_1_2.output.amount;
1172 let price_float = (output.value() as f64) / (input.value() as f64);
1173
1174 let empty_vec = vec![];
1175 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1176 let filtered_swaps: Vec<_> = swaps_for_pair
1177 .iter()
1178 .filter(|swap| swap.delta_1_i != Amount::zero())
1179 .collect::<Vec<_>>();
1180 let num_swaps = filtered_swaps.len() as i32;
1181
1182 batch_swap_summaries.push(BatchSwapSummary {
1183 asset_start,
1184 asset_end,
1185 input,
1186 output,
1187 num_swaps,
1188 price_float,
1189 });
1190 }
1191
1192 if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1193 let asset_start = swap_2_1.input.asset_id;
1194 let asset_end = swap_2_1.output.asset_id;
1195 let input = swap_2_1.input.amount;
1196 let output = swap_2_1.output.amount;
1197 let price_float = (output.value() as f64) / (input.value() as f64);
1198
1199 let empty_vec = vec![];
1200 let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1201 let filtered_swaps: Vec<_> = swaps_for_pair
1202 .iter()
1203 .filter(|swap| swap.delta_2_i != Amount::zero())
1204 .collect::<Vec<_>>();
1205 let num_swaps = filtered_swaps.len() as i32;
1206
1207 batch_swap_summaries.push(BatchSwapSummary {
1208 asset_start,
1209 asset_end,
1210 input,
1211 output,
1212 num_swaps,
1213 price_float,
1214 });
1215 }
1216 }
1217
1218 sqlx::query(
1219 "INSERT INTO dex_ex_block_summary (
1220 height,
1221 time,
1222 batch_swaps,
1223 num_open_lps,
1224 num_closed_lps,
1225 num_withdrawn_lps,
1226 num_swaps,
1227 num_swap_claims,
1228 num_txs
1229 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1230 )
1231 .bind(height)
1232 .bind(time)
1233 .bind(serde_json::to_value(&batch_swap_summaries)?)
1234 .bind(num_opened_lps)
1235 .bind(num_closed_lps)
1236 .bind(num_withdrawn_lps)
1237 .bind(num_swaps)
1238 .bind(num_swap_claims)
1239 .bind(num_txs)
1240 .execute(dbtx.as_mut())
1241 .await?;
1242
1243 Ok(())
1244 }
1245
1246 async fn record_batch_swap_traces(
1247 &self,
1248 dbtx: &mut PgTransaction<'_>,
1249 time: DateTime,
1250 height: i32,
1251 event: &EventBatchSwap,
1252 ) -> anyhow::Result<()> {
1253 let EventBatchSwap {
1254 batch_swap_output_data: _,
1255 swap_execution_1_for_2,
1256 swap_execution_2_for_1,
1257 } = event;
1258
1259 if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1260 self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1261 .await?;
1262 }
1263
1264 if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1265 self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1266 .await?;
1267 }
1268
1269 Ok(())
1270 }
1271
1272 async fn record_position_execution(
1273 &self,
1274 dbtx: &mut PgTransaction<'_>,
1275 time: DateTime,
1276 height: i32,
1277 event: &EventPositionExecution,
1278 positions: &BTreeMap<PositionId, Position>,
1279 ) -> anyhow::Result<()> {
1280 let position = positions
1282 .get(&event.position_id)
1283 .expect("position must exist for execution");
1284 let current = Reserves {
1285 r1: event.reserves_1,
1286 r2: event.reserves_2,
1287 };
1288 let prev = Reserves {
1289 r1: event.prev_reserves_1,
1290 r2: event.prev_reserves_2,
1291 };
1292 let flows = Flows::from_phi_and_reserves(&position.phi, ¤t, &prev);
1293
1294 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1296 "INSERT INTO dex_ex_position_reserves (
1297 position_id,
1298 height,
1299 time,
1300 reserves_1,
1301 reserves_2
1302 ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1303 )
1304 .bind(event.position_id.0)
1305 .bind(height)
1306 .bind(time)
1307 .bind(BigDecimal::from(event.reserves_1.value()))
1308 .bind(BigDecimal::from(event.reserves_2.value()))
1309 .fetch_one(dbtx.as_mut())
1310 .await?;
1311
1312 sqlx::query(
1314 "INSERT INTO dex_ex_position_executions (
1315 position_id,
1316 height,
1317 time,
1318 reserves_rowid,
1319 delta_1,
1320 delta_2,
1321 lambda_1,
1322 lambda_2,
1323 fee_1,
1324 fee_2,
1325 context_asset_start,
1326 context_asset_end
1327 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1328 )
1329 .bind(event.position_id.0)
1330 .bind(height)
1331 .bind(time)
1332 .bind(reserves_rowid)
1333 .bind(BigDecimal::from(flows.delta_1().value()))
1334 .bind(BigDecimal::from(flows.delta_2().value()))
1335 .bind(BigDecimal::from(flows.lambda_1().value()))
1336 .bind(BigDecimal::from(flows.lambda_2().value()))
1337 .bind(BigDecimal::from(flows.fee_1().value()))
1338 .bind(BigDecimal::from(flows.fee_2().value()))
1339 .bind(event.context.start.to_bytes())
1340 .bind(event.context.end.to_bytes())
1341 .execute(dbtx.as_mut())
1342 .await?;
1343
1344 Ok(())
1345 }
1346
1347 async fn record_position_close(
1348 &self,
1349 dbtx: &mut PgTransaction<'_>,
1350 time: DateTime,
1351 height: i32,
1352 tx_hash: Option<[u8; 32]>,
1353 event: &EventPositionClose,
1354 ) -> anyhow::Result<()> {
1355 sqlx::query(
1356 "UPDATE dex_ex_position_state
1357 SET closing_time = $1,
1358 closing_height = $2,
1359 closing_tx = $3
1360 WHERE position_id = $4",
1361 )
1362 .bind(time)
1363 .bind(height)
1364 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1365 .bind(event.position_id.0)
1366 .execute(dbtx.as_mut())
1367 .await?;
1368
1369 Ok(())
1370 }
1371
1372 async fn record_position_withdraw(
1373 &self,
1374 dbtx: &mut PgTransaction<'_>,
1375 time: DateTime,
1376 height: i32,
1377 tx_hash: Option<[u8; 32]>,
1378 event: &EventPositionWithdraw,
1379 ) -> anyhow::Result<()> {
1380 let reserves_rowid = sqlx::query_scalar::<_, i32>(
1382 "INSERT INTO dex_ex_position_reserves (
1383 position_id,
1384 height,
1385 time,
1386 reserves_1,
1387 reserves_2
1388 ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", )
1390 .bind(event.position_id.0)
1391 .bind(height)
1392 .bind(time)
1393 .bind(BigDecimal::from(0)) .fetch_one(dbtx.as_mut())
1395 .await?;
1396
1397 sqlx::query(
1398 "INSERT INTO dex_ex_position_withdrawals (
1399 position_id,
1400 height,
1401 time,
1402 withdrawal_tx,
1403 sequence,
1404 reserves_1,
1405 reserves_2,
1406 reserves_rowid
1407 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1408 )
1409 .bind(event.position_id.0)
1410 .bind(height)
1411 .bind(time)
1412 .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1413 .bind(event.sequence as i32)
1414 .bind(BigDecimal::from(event.reserves_1.value()))
1415 .bind(BigDecimal::from(event.reserves_2.value()))
1416 .bind(reserves_rowid)
1417 .execute(dbtx.as_mut())
1418 .await?;
1419
1420 Ok(())
1421 }
1422
1423 async fn record_transaction(
1424 &self,
1425 dbtx: &mut PgTransaction<'_>,
1426 time: DateTime,
1427 height: u64,
1428 transaction_id: [u8; 32],
1429 transaction: Transaction,
1430 ) -> anyhow::Result<()> {
1431 if transaction.transaction_body.actions.is_empty() {
1432 return Ok(());
1433 }
1434 sqlx::query(
1435 "INSERT INTO dex_ex_transactions (
1436 transaction_id,
1437 transaction,
1438 height,
1439 time
1440 ) VALUES ($1, $2, $3, $4)
1441 ",
1442 )
1443 .bind(transaction_id)
1444 .bind(transaction.encode_to_vec())
1445 .bind(i32::try_from(height)?)
1446 .bind(time)
1447 .execute(dbtx.as_mut())
1448 .await?;
1449
1450 Ok(())
1451 }
1452
1453 async fn record_all_transactions(
1454 &self,
1455 dbtx: &mut PgTransaction<'_>,
1456 time: DateTime,
1457 block: &BlockEvents,
1458 ) -> anyhow::Result<()> {
1459 for (tx_id, tx_bytes) in block.transactions() {
1460 let tx = Transaction::try_from(tx_bytes)?;
1461 let height = block.height();
1462 self.record_transaction(dbtx, time, height, tx_id, tx)
1463 .await?;
1464 }
1465 Ok(())
1466 }
1467}
1468
1469#[async_trait]
1470impl AppView for Component {
1471 async fn init_chain(
1472 &self,
1473 _dbtx: &mut PgTransaction,
1474 _: &serde_json::Value,
1475 ) -> Result<(), anyhow::Error> {
1476 Ok(())
1477 }
1478
1479 fn name(&self) -> String {
1480 "dex_ex".to_string()
1481 }
1482
1483 fn version(&self) -> Version {
1484 let hash: [u8; 32] = blake2b_simd::Params::default()
1485 .personal(b"option_hash")
1486 .hash_length(32)
1487 .to_state()
1488 .update(&self.denom.to_bytes())
1489 .update(&self.min_liquidity.to_le_bytes())
1490 .update(&[u8::from(self.ignore_arb_executions)])
1491 .finalize()
1492 .as_bytes()
1493 .try_into()
1494 .expect("Impossible 000-001: expected 32 byte hash");
1495 Version::with_major(1).with_option_hash(hash)
1496 }
1497
1498 async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1499 for statement in include_str!("reset.sql").split(";") {
1500 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1501 }
1502 Ok(())
1503 }
1504
1505 async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1506 for statement in include_str!("schema.sql").split(";") {
1507 sqlx::query(statement).execute(dbtx.as_mut()).await?;
1508 }
1509 Ok(())
1510 }
1511
1512 async fn index_batch(
1513 &self,
1514 dbtx: &mut PgTransaction,
1515 batch: EventBatch,
1516 ctx: EventBatchContext,
1517 ) -> Result<(), anyhow::Error> {
1518 metadata::set(dbtx, self.denom).await?;
1519 let mut charts = HashMap::new();
1520 let mut snapshots = HashMap::new();
1521 let mut last_time = None;
1522 for block in batch.events_by_block() {
1523 let mut events = Events::extract(&block, self.ignore_arb_executions)?;
1524 let time = events
1525 .time
1526 .expect(&format!("no block root event at height {}", block.height()));
1527 last_time = Some(time);
1528
1529 self.record_all_transactions(dbtx, time, block).await?;
1530
1531 events.load_positions(dbtx).await?;
1533
1534 self.record_block_summary(dbtx, time, block.height() as i32, &events)
1536 .await?;
1537
1538 for event in &events.batch_swaps {
1540 self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1541 .await?;
1542 }
1543
1544 for event in &events.position_opens {
1546 let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1547 self.record_position_open(dbtx, time, events.height, tx_hash, event)
1548 .await?;
1549 }
1550
1551 for event in &events.position_executions {
1553 self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1554 .await?;
1555 }
1556
1557 for event in &events.position_closes {
1559 let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1560 self.record_position_close(dbtx, time, events.height, tx_hash, event)
1561 .await?;
1562 }
1563
1564 for event in &events.position_withdrawals {
1566 let tx_hash = events
1567 .position_withdrawal_txs
1568 .get(&event.position_id)
1569 .copied();
1570 self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1571 .await?;
1572 }
1573
1574 for (pair, candle) in &events.candles {
1575 sqlx::query(
1576 "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1577 )
1578 .bind(i64::try_from(block.height())?)
1579 .bind(time)
1580 .bind(pair.start.to_bytes())
1581 .bind(pair.end.to_bytes())
1582 .bind(candle.open)
1583 .bind(candle.close)
1584 .bind(candle.low)
1585 .bind(candle.high)
1586 .bind(candle.direct_volume)
1587 .bind(candle.swap_volume)
1588 .execute(dbtx.as_mut())
1589 .await?;
1590 for window in Window::all() {
1591 let key = (pair.start, pair.end, window);
1592 if !charts.contains_key(&key) {
1593 let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1594 charts.insert(key, ctx);
1595 }
1596 charts
1597 .get_mut(&key)
1598 .unwrap() .update(dbtx, time, *candle)
1600 .await?;
1601 }
1602 }
1603
1604 let block_pairs = events
1605 .candles
1606 .keys()
1607 .chain(events.metrics.keys())
1608 .copied()
1609 .collect::<HashSet<_>>();
1610 for pair in block_pairs {
1611 if !snapshots.contains_key(&pair) {
1612 let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1613 snapshots.insert(pair, ctx);
1614 }
1615 snapshots
1617 .get_mut(&pair)
1618 .unwrap()
1619 .update(
1620 dbtx,
1621 time,
1622 events.candles.get(&pair).copied(),
1623 events.metrics.get(&pair).copied().unwrap_or_default(),
1624 events.price_for(self.denom, pair.start),
1625 )
1626 .await?;
1627 }
1628 }
1629
1630 if ctx.is_last() {
1631 if let Some(now) = last_time {
1632 for window in Window::all() {
1633 summary::update_summaries(dbtx, now, window).await?;
1634 summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1635 .await?;
1636 }
1637 }
1638 }
1639 for chart in charts.into_values() {
1640 chart.unload(dbtx).await?;
1641 }
1642 Ok(())
1643 }
1644}