pindexer/dex_ex/
mod.rs

1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4    async_trait,
5    index::{BlockEvents, EventBatch, EventBatchContext, Version},
6    AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10    event::{
11        EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12        EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13    },
14    lp::{position::Flows, Reserves},
15    DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18    event::{EventSwap, EventSwapClaim},
19    lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34    use super::DateTime;
35    use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36    use penumbra_sdk_dex::CandlestickData;
37    use std::fmt::Display;
38
39    /// Candlestick data, unmoored from the prison of a particular block height.
40    ///
41    /// In other words, this can represent candlesticks which span arbitrary windows,
42    /// and not just a single block.
43    #[derive(Debug, Clone, Copy)]
44    pub struct Candle {
45        pub open: f64,
46        pub close: f64,
47        pub low: f64,
48        pub high: f64,
49        pub direct_volume: f64,
50        pub swap_volume: f64,
51    }
52
53    impl Candle {
54        pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55            // The volume is tracked in terms of input asset.
56            // We can use the closing price (aka. the common clearing price) to convert
57            // the volume to the other direction i.e, the batch swap output.
58            Self {
59                open: data.open,
60                close: data.close,
61                low: data.low,
62                high: data.high,
63                direct_volume: data.direct_volume,
64                swap_volume: data.swap_volume,
65            }
66        }
67
68        pub fn point(price: f64, direct_volume: f64) -> Self {
69            Self {
70                open: price,
71                close: price,
72                low: price,
73                high: price,
74                direct_volume,
75                swap_volume: 0.0,
76            }
77        }
78
79        pub fn merge(&mut self, that: &Self) {
80            self.close = that.close;
81            self.low = self.low.min(that.low);
82            self.high = self.high.max(that.high);
83            self.direct_volume += that.direct_volume;
84            self.swap_volume += that.swap_volume;
85        }
86    }
87
88    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89    pub enum Window {
90        W1m,
91        W15m,
92        W1h,
93        W4h,
94        W1d,
95        W1w,
96        W1mo,
97    }
98
99    impl Window {
100        pub fn all() -> impl Iterator<Item = Self> {
101            [
102                Window::W1m,
103                Window::W15m,
104                Window::W1h,
105                Window::W4h,
106                Window::W1d,
107                Window::W1w,
108                Window::W1mo,
109            ]
110            .into_iter()
111        }
112
113        /// Get the anchor for a given time.
114        ///
115        /// This is the latest time that "snaps" to a given anchor, dependent on the window.
116        ///
117        /// For example, the 1 minute window has an anchor every minute, the day window
118        /// every day, etc.
119        pub fn anchor(&self, time: DateTime) -> DateTime {
120            let (y, mo, d, h, m) = (
121                time.year(),
122                time.month(),
123                time.day(),
124                time.hour(),
125                time.minute(),
126            );
127            let out = match self {
128                Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129                Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130                Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131                Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132                Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133                Window::W1w => Utc
134                    .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135                    .single()
136                    .and_then(|x| {
137                        x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138                    }),
139                Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140            };
141            out.unwrap()
142        }
143
144        pub fn subtract_from(&self, time: DateTime) -> DateTime {
145            let delta = match self {
146                Window::W1m => TimeDelta::minutes(1),
147                Window::W15m => TimeDelta::minutes(15),
148                Window::W1h => TimeDelta::hours(1),
149                Window::W4h => TimeDelta::hours(4),
150                Window::W1d => TimeDelta::days(1),
151                Window::W1w => TimeDelta::weeks(1),
152                Window::W1mo => TimeDelta::days(30),
153            };
154            time - delta
155        }
156    }
157
158    impl Display for Window {
159        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160            use Window::*;
161            let str = match self {
162                W1m => "1m",
163                W15m => "15m",
164                W1h => "1h",
165                W4h => "4h",
166                W1d => "1d",
167                W1w => "1w",
168                W1mo => "1mo",
169            };
170            write!(f, "{}", str)
171        }
172    }
173
174    #[derive(Debug)]
175    pub struct WindowedCandle {
176        start: DateTime,
177        window: Window,
178        candle: Candle,
179    }
180
181    impl WindowedCandle {
182        pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183            Self {
184                start: window.anchor(now),
185                window,
186                candle,
187            }
188        }
189
190        /// Update with a new candlestick, at a given time.
191        ///
192        /// This may return the old candlestick and its start time, if we should be starting
193        /// a new candle, based on the candle for that window having already been closed.
194        pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195            let start = self.window.anchor(now);
196            // This candle belongs to the next window!
197            if start > self.start {
198                let old_start = std::mem::replace(&mut self.start, start);
199                let old_candle = std::mem::replace(&mut self.candle, candle);
200                Some((old_start, old_candle))
201            } else {
202                self.candle.merge(&candle);
203                None
204            }
205        }
206
207        pub fn window(&self) -> Window {
208            self.window
209        }
210
211        pub fn flush(self) -> (DateTime, Candle) {
212            (self.start, self.candle)
213        }
214    }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219    use super::*;
220
221    /// A context when processing a price chart.
222    #[derive(Debug)]
223    pub struct Context {
224        asset_start: asset::Id,
225        asset_end: asset::Id,
226        window: Window,
227        state: Option<WindowedCandle>,
228    }
229
230    impl Context {
231        pub async fn load(
232            dbtx: &mut PgTransaction<'_>,
233            asset_start: asset::Id,
234            asset_end: asset::Id,
235            window: Window,
236        ) -> anyhow::Result<Self> {
237            let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238                "
239                SELECT open, close, high, low, direct_volume, swap_volume, start_time
240                FROM dex_ex_price_charts
241                WHERE asset_start = $1
242                AND asset_end = $2
243                AND the_window = $3
244                ORDER BY start_time DESC
245                LIMIT 1
246            ",
247            )
248            .bind(asset_start.to_bytes())
249            .bind(asset_end.to_bytes())
250            .bind(window.to_string())
251            .fetch_optional(dbtx.as_mut())
252            .await?;
253            let state = row.map(
254                |(open, close, high, low, direct_volume, swap_volume, start)| {
255                    let candle = Candle {
256                        open,
257                        close,
258                        low,
259                        high,
260                        direct_volume,
261                        swap_volume,
262                    };
263                    WindowedCandle::new(start, window, candle)
264                },
265            );
266            Ok(Self {
267                asset_start,
268                asset_end,
269                window,
270                state,
271            })
272        }
273
274        async fn write_candle(
275            &self,
276            dbtx: &mut PgTransaction<'_>,
277            start: DateTime,
278            candle: Candle,
279        ) -> anyhow::Result<()> {
280            sqlx::query(
281                "
282            INSERT INTO dex_ex_price_charts(
283                id, asset_start, asset_end, the_window,
284                start_time, open, close, high, low, direct_volume, swap_volume
285            ) 
286            VALUES(
287                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288            )
289            ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290                open = EXCLUDED.open,
291                close = EXCLUDED.close,
292                high = EXCLUDED.high,
293                low = EXCLUDED.low,
294                direct_volume = EXCLUDED.direct_volume,
295                swap_volume = EXCLUDED.swap_volume
296            ",
297            )
298            .bind(self.asset_start.to_bytes())
299            .bind(self.asset_end.to_bytes())
300            .bind(self.window.to_string())
301            .bind(start)
302            .bind(candle.open)
303            .bind(candle.close)
304            .bind(candle.high)
305            .bind(candle.low)
306            .bind(candle.direct_volume)
307            .bind(candle.swap_volume)
308            .execute(dbtx.as_mut())
309            .await?;
310            Ok(())
311        }
312
313        pub async fn update(
314            &mut self,
315            dbtx: &mut PgTransaction<'_>,
316            now: DateTime,
317            candle: Candle,
318        ) -> anyhow::Result<()> {
319            let state = match self.state.as_mut() {
320                Some(x) => x,
321                None => {
322                    self.state = Some(WindowedCandle::new(now, self.window, candle));
323                    self.state.as_mut().unwrap()
324                }
325            };
326            if let Some((start, old_candle)) = state.with_candle(now, candle) {
327                self.write_candle(dbtx, start, old_candle).await?;
328            };
329            Ok(())
330        }
331
332        pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333            let state = std::mem::replace(&mut self.state, None);
334            if let Some(state) = state {
335                let (start, candle) = state.flush();
336                self.write_candle(dbtx, start, candle).await?;
337            }
338            Ok(())
339        }
340    }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346    use cometindex::PgTransaction;
347    use penumbra_sdk_asset::asset;
348
349    use super::{Candle, DateTime, PairMetrics, Window};
350
351    pub struct Context {
352        start: asset::Id,
353        end: asset::Id,
354        price: f64,
355        liquidity: f64,
356        start_price_indexing_denom: f64,
357    }
358
359    impl Context {
360        pub async fn load(
361            dbtx: &mut PgTransaction<'_>,
362            start: asset::Id,
363            end: asset::Id,
364        ) -> anyhow::Result<Self> {
365            let row: Option<(f64, f64, f64)> = sqlx::query_as(
366                "
367                SELECT price, liquidity, start_price_indexing_denom
368                FROM dex_ex_pairs_block_snapshot
369                WHERE asset_start = $1
370                AND asset_end = $2
371                ORDER BY id DESC
372                LIMIT 1
373            ",
374            )
375            .bind(start.to_bytes())
376            .bind(end.to_bytes())
377            .fetch_optional(dbtx.as_mut())
378            .await?;
379            let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380            Ok(Self {
381                start,
382                end,
383                price,
384                liquidity,
385                start_price_indexing_denom,
386            })
387        }
388
389        pub async fn update(
390            &mut self,
391            dbtx: &mut PgTransaction<'_>,
392            now: DateTime,
393            candle: Option<Candle>,
394            metrics: PairMetrics,
395            start_price_indexing_denom: Option<f64>,
396        ) -> anyhow::Result<()> {
397            if let Some(candle) = candle {
398                self.price = candle.close;
399            }
400            if let Some(price) = start_price_indexing_denom {
401                self.start_price_indexing_denom = price;
402            }
403            self.liquidity += metrics.liquidity_change;
404
405            sqlx::query(
406                "
407            INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409            )        
410        ",
411            )
412            .bind(now)
413            .bind(self.start.to_bytes())
414            .bind(self.end.to_bytes())
415            .bind(self.price)
416            .bind(self.liquidity)
417            .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418            .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419            .bind(self.start_price_indexing_denom)
420            .bind(metrics.trades)
421            .execute(dbtx.as_mut())
422            .await?;
423            Ok(())
424        }
425    }
426
427    pub async fn update_summaries(
428        dbtx: &mut PgTransaction<'_>,
429        now: DateTime,
430        window: Window,
431    ) -> anyhow::Result<()> {
432        let then = window.subtract_from(now);
433        let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434            "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435        )
436        .fetch_all(dbtx.as_mut())
437        .await?;
438        for (start, end) in pairs {
439            sqlx::query(
440                "
441            WITH
442            snapshots AS (
443                SELECT *
444                FROM dex_ex_pairs_block_snapshot
445                WHERE asset_start = $1
446                AND asset_end = $2
447            ),
448            previous AS (
449                SELECT price AS price_then, liquidity AS liquidity_then
450                FROM snapshots
451                WHERE time <= $4
452                ORDER BY time DESC
453                LIMIT 1
454            ),
455            previous_or_default AS (
456                SELECT
457                    COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458                    COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459            ),
460            now AS (
461                SELECT price, liquidity            
462                FROM snapshots
463                WHERE time <= $3
464                ORDER BY time DESC
465                LIMIT 1
466            ),
467            sums AS (
468                SELECT
469                    COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470                    COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473                    COALESCE(SUM(trades), 0.0) AS trades_over_window,
474                    COALESCE(MIN(price), 0.0) AS low,
475                    COALESCE(MAX(price), 0.0) AS high
476                FROM snapshots
477                WHERE time <= $3
478                AND time >= $4
479            )
480            INSERT INTO dex_ex_pairs_summary
481            SELECT 
482                $1, $2, $5,
483                price, price_then,
484                low, high,
485                liquidity, liquidity_then,
486                direct_volume_over_window,
487                swap_volume_over_window,
488                direct_volume_indexing_denom_over_window,
489                swap_volume_indexing_denom_over_window,
490                trades_over_window
491            FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492            ON CONFLICT (asset_start, asset_end, the_window)
493            DO UPDATE SET
494                price = EXCLUDED.price,
495                price_then = EXCLUDED.price_then,
496                liquidity = EXCLUDED.liquidity,
497                liquidity_then = EXCLUDED.liquidity_then,
498                direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499                swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500                direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501                swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502                trades_over_window = EXCLUDED.trades_over_window
503            ",
504            )
505            .bind(start)
506            .bind(end)
507            .bind(now)
508            .bind(then)
509            .bind(window.to_string())
510            .execute(dbtx.as_mut())
511            .await?;
512        }
513        Ok(())
514    }
515
516    pub async fn update_aggregate_summary(
517        dbtx: &mut PgTransaction<'_>,
518        window: Window,
519        denom: asset::Id,
520        min_liquidity: f64,
521    ) -> anyhow::Result<()> {
522        // TODO: do something here
523        sqlx::query(
524            "
525        WITH       
526            eligible_denoms AS (
527                SELECT asset_start as asset, price
528                FROM dex_ex_pairs_summary
529                WHERE asset_end = $1
530                AND liquidity >= $2
531                UNION VALUES ($1, 1.0)
532            ),
533            converted_pairs_summary AS (
534                SELECT
535                    asset_start, asset_end,
536                    (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537                    liquidity * ed_end.price AS liquidity,
538                    direct_volume_indexing_denom_over_window AS dv,
539                    swap_volume_indexing_denom_over_window AS sv,
540                    trades_over_window as trades
541                FROM dex_ex_pairs_summary
542                JOIN eligible_denoms AS ed_end
543                ON ed_end.asset = asset_end
544                JOIN eligible_denoms AS ed_start
545                ON ed_start.asset = asset_start
546                WHERE the_window = $3
547            ),
548            sums AS (
549                SELECT
550                    SUM(dv) AS direct_volume,
551                    SUM(sv) AS swap_volume,
552                    SUM(trades) AS trades
553                FROM converted_pairs_summary WHERE asset_start < asset_end
554            ),
555            total_liquidity AS (
556                SELECT
557                    SUM(liquidity) AS liquidity
558                FROM converted_pairs_summary
559            ),
560            undirected_pairs_summary AS (
561              WITH pairs AS (
562                  SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563              )
564              SELECT
565                  a_start AS asset_start,
566                  a_end AS asset_end,
567                  (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568                  (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569              FROM pairs
570            ),
571            counts AS (
572              SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573            ),
574            largest_sv AS (
575                SELECT
576                    asset_start AS largest_sv_trading_pair_start,
577                    asset_end AS largest_sv_trading_pair_end,
578                    sv AS largest_sv_trading_pair_volume                    
579                FROM undirected_pairs_summary
580                ORDER BY sv DESC
581                LIMIT 1
582            ),
583            largest_dv AS (
584                SELECT
585                    asset_start AS largest_dv_trading_pair_start,
586                    asset_end AS largest_dv_trading_pair_end,
587                    dv AS largest_dv_trading_pair_volume                    
588                FROM undirected_pairs_summary
589                ORDER BY dv DESC
590                LIMIT 1
591            ),
592            top_price_mover AS (
593                SELECT
594                    asset_start AS top_price_mover_start,
595                    asset_end AS top_price_mover_end,
596                    price_change AS top_price_mover_change_percent
597                FROM converted_pairs_summary
598                ORDER BY price_change DESC
599                LIMIT 1
600            )
601        INSERT INTO dex_ex_aggregate_summary
602        SELECT
603            $3,
604            direct_volume, swap_volume, liquidity, trades, active_pairs,
605            largest_sv_trading_pair_start,
606            largest_sv_trading_pair_end,
607            largest_sv_trading_pair_volume,
608            largest_dv_trading_pair_start,
609            largest_dv_trading_pair_end,
610            largest_dv_trading_pair_volume,
611            top_price_mover_start,
612            top_price_mover_end,
613            top_price_mover_change_percent
614        FROM
615            sums
616        JOIN largest_sv ON TRUE
617        JOIN largest_dv ON TRUE
618        JOIN top_price_mover ON TRUE
619        JOIN total_liquidity ON TRUE
620        JOIN counts ON TRUE
621        ON CONFLICT (the_window) DO UPDATE SET
622            direct_volume = EXCLUDED.direct_volume,
623            swap_volume = EXCLUDED.swap_volume,
624            liquidity = EXCLUDED.liquidity,
625            trades = EXCLUDED.trades,
626            active_pairs = EXCLUDED.active_pairs,          
627            largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628            largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629            largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630            largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631            largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632            largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633            top_price_mover_start = EXCLUDED.top_price_mover_start,
634            top_price_mover_end = EXCLUDED.top_price_mover_end,
635            top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636        ")
637        .bind(denom.to_bytes())
638        .bind(min_liquidity)
639        .bind(window.to_string())
640        .execute(dbtx.as_mut())
641        .await?;
642        Ok(())
643    }
644}
645
646mod metadata {
647    use super::*;
648
649    pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650        sqlx::query(
651            "
652        INSERT INTO dex_ex_metadata
653        VALUES (1, $1)
654        ON CONFLICT (id) DO UPDATE 
655        SET id = EXCLUDED.id,
656            quote_asset_id = EXCLUDED.quote_asset_id
657        ",
658        )
659        .bind(quote_asset.to_bytes())
660        .execute(dbtx.as_mut())
661        .await?;
662        Ok(())
663    }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668    trades: f64,
669    liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674    asset_start: asset::Id,
675    asset_end: asset::Id,
676    input: Amount,
677    output: Amount,
678    num_swaps: i32,
679    price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684    time: Option<DateTime>,
685    height: i32,
686    candles: HashMap<DirectedTradingPair, Candle>,
687    metrics: HashMap<DirectedTradingPair, PairMetrics>,
688    // Relevant positions.
689    positions: BTreeMap<PositionId, Position>,
690    // Store events
691    position_opens: Vec<EventPositionOpen>,
692    position_executions: Vec<EventPositionExecution>,
693    position_closes: Vec<EventPositionClose>,
694    position_withdrawals: Vec<EventPositionWithdraw>,
695    batch_swaps: Vec<EventBatchSwap>,
696    swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697    swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698    // Track transaction hashes by position ID
699    position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700    position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701    position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705    fn new() -> Self {
706        Self {
707            time: None,
708            height: 0,
709            candles: HashMap::new(),
710            metrics: HashMap::new(),
711            positions: BTreeMap::new(),
712            position_opens: Vec::new(),
713            position_executions: Vec::new(),
714            position_closes: Vec::new(),
715            position_withdrawals: Vec::new(),
716            batch_swaps: Vec::new(),
717            swaps: BTreeMap::new(),
718            swap_claims: BTreeMap::new(),
719            position_open_txs: BTreeMap::new(),
720            position_close_txs: BTreeMap::new(),
721            position_withdrawal_txs: BTreeMap::new(),
722        }
723    }
724
725    fn with_time(&mut self, time: DateTime) {
726        self.time = Some(time)
727    }
728
729    fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730        if !self.metrics.contains_key(pair) {
731            self.metrics.insert(*pair, PairMetrics::default());
732        }
733        // NOPANIC: inserted above.
734        self.metrics.get_mut(pair).unwrap()
735    }
736
737    fn with_trace(&mut self, input: Value, output: Value) {
738        let pair_1_2 = DirectedTradingPair {
739            start: input.asset_id,
740            end: output.asset_id,
741        };
742        // This shouldn't happen, but to avoid weird stuff later, let's ignore this trace.
743        if pair_1_2.start == pair_1_2.end {
744            tracing::warn!(?input, ?output, "found trace with a loop?");
745            return;
746        }
747        let pair_2_1 = pair_1_2.flip();
748        let input_amount = f64::from(input.amount);
749        let output_amount = f64::from(output.amount);
750        if input_amount == 0.0 && output_amount == 0.0 {
751            tracing::warn!(?input, ?output, "ignoring trace with 0 input and output");
752            return;
753        }
754        let price_1_2 = output_amount / input_amount;
755        let candle_1_2 = Candle::point(price_1_2, input_amount);
756        let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
757        self.metric(&pair_1_2).trades += 1.0;
758        self.candles
759            .entry(pair_1_2)
760            .and_modify(|c| c.merge(&candle_1_2))
761            .or_insert(candle_1_2);
762        self.metric(&pair_2_1).trades += 1.0;
763        self.candles
764            .entry(pair_2_1)
765            .and_modify(|c| c.merge(&candle_2_1))
766            .or_insert(candle_2_1);
767    }
768
769    fn with_swap_execution(&mut self, se: &SwapExecution) {
770        for row in se.traces.iter() {
771            for window in row.windows(2) {
772                self.with_trace(window[0], window[1]);
773            }
774        }
775        let pair_1_2 = DirectedTradingPair {
776            start: se.input.asset_id,
777            end: se.output.asset_id,
778        };
779        // When doing arb, we don't want to report the volume on UM -> UM,
780        // so we need this check.
781        if pair_1_2.start == pair_1_2.end {
782            return;
783        }
784        let pair_2_1 = pair_1_2.flip();
785        self.candles
786            .entry(pair_1_2)
787            .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
788        self.candles
789            .entry(pair_2_1)
790            .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
791    }
792
793    fn with_reserve_change(
794        &mut self,
795        pair: &TradingPair,
796        old_reserves: Option<Reserves>,
797        new_reserves: Reserves,
798        removed: bool,
799    ) {
800        let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
801            (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
802            (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
803            (_, Some(old), new) => (
804                (new.r1.value() as f64) - (old.r1.value() as f64),
805                (new.r2.value() as f64) - (old.r2.value() as f64),
806            ),
807        };
808        for (d_pair, diff) in [
809            (
810                DirectedTradingPair {
811                    start: pair.asset_1(),
812                    end: pair.asset_2(),
813                },
814                diff_2,
815            ),
816            (
817                DirectedTradingPair {
818                    start: pair.asset_2(),
819                    end: pair.asset_1(),
820                },
821                diff_1,
822            ),
823        ] {
824            self.metric(&d_pair).liquidity_change += diff;
825        }
826    }
827
828    pub fn extract(block: &BlockEvents, ignore_arb_executions: bool) -> anyhow::Result<Self> {
829        let mut out = Self::new();
830        out.height = block.height() as i32;
831
832        for event in block.events() {
833            if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
834                let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
835                    "creating timestamp should succeed; timestamp: {}",
836                    e.timestamp_seconds
837                ))?;
838                out.with_time(time);
839            } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
840                out.with_reserve_change(
841                    &e.trading_pair,
842                    None,
843                    Reserves {
844                        r1: e.reserves_1,
845                        r2: e.reserves_2,
846                    },
847                    false,
848                );
849                if let Some(tx_hash) = event.tx_hash() {
850                    out.position_open_txs.insert(e.position_id, tx_hash);
851                }
852                // A newly opened position might be executed against in this block,
853                // but wouldn't already be in the database. Adding it here ensures
854                // it's available.
855                out.positions.insert(e.position_id, e.position.clone());
856                out.position_opens.push(e);
857            } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
858                // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few
859                // positions to close with being withdrawn.
860                out.with_reserve_change(
861                    &e.trading_pair,
862                    None,
863                    Reserves {
864                        r1: e.reserves_1,
865                        r2: e.reserves_2,
866                    },
867                    true,
868                );
869                if let Some(tx_hash) = event.tx_hash() {
870                    out.position_withdrawal_txs.insert(e.position_id, tx_hash);
871                }
872                out.position_withdrawals.push(e);
873            } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
874                out.with_reserve_change(
875                    &e.trading_pair,
876                    Some(Reserves {
877                        r1: e.prev_reserves_1,
878                        r2: e.prev_reserves_2,
879                    }),
880                    Reserves {
881                        r1: e.reserves_1,
882                        r2: e.reserves_2,
883                    },
884                    false,
885                );
886                out.position_executions.push(e);
887            } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
888                out.position_closes.push(e);
889            } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
890                // The position close event is emitted by the dex module at EOB,
891                // so we need to track it with the tx hash of the closure tx.
892                if let Some(tx_hash) = event.tx_hash() {
893                    out.position_close_txs.insert(e.position_id, tx_hash);
894                }
895            } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
896                out.swaps
897                    .entry(e.trading_pair)
898                    .or_insert_with(Vec::new)
899                    .push(e);
900            } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
901                out.swap_claims
902                    .entry(e.trading_pair)
903                    .or_insert_with(Vec::new)
904                    .push(e);
905            } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
906                let pair = DirectedTradingPair {
907                    start: e.incentivized_asset_id,
908                    end: *STAKING_TOKEN_ASSET_ID,
909                };
910                out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
911            } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
912                // NOTE: order matters here, 2 for 1 happened after.
913                if let Some(se) = e.swap_execution_1_for_2.as_ref() {
914                    out.with_swap_execution(se);
915                }
916                if let Some(se) = e.swap_execution_2_for_1.as_ref() {
917                    out.with_swap_execution(se);
918                }
919                out.batch_swaps.push(e);
920            } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
921                if !ignore_arb_executions {
922                    out.with_swap_execution(&e.swap_execution);
923                }
924            }
925        }
926        Ok(out)
927    }
928
929    async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
930        // Collect position IDs that we need but don't already have
931        let missing_positions: Vec<_> = self
932            .position_executions
933            .iter()
934            .map(|e| e.position_id)
935            .filter(|id| !self.positions.contains_key(id))
936            .collect();
937
938        if missing_positions.is_empty() {
939            return Ok(());
940        }
941
942        // Load missing positions from database
943        let rows = sqlx::query(
944            "SELECT position_raw 
945             FROM dex_ex_position_state 
946             WHERE position_id = ANY($1)",
947        )
948        .bind(
949            &missing_positions
950                .iter()
951                .map(|id| id.0.as_ref())
952                .collect::<Vec<_>>(),
953        )
954        .fetch_all(dbtx.as_mut())
955        .await?;
956
957        // Decode and store each position
958        for row in rows {
959            let position_raw: Vec<u8> = row.get("position_raw");
960            let position = Position::decode(position_raw.as_slice())?;
961            self.positions.insert(position.id(), position);
962        }
963
964        Ok(())
965    }
966
967    /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
968    pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
969        self.candles
970            .get(&DirectedTradingPair::new(asset, indexing_denom))
971            .map(|x| x.close)
972    }
973}
974
975#[derive(Debug)]
976pub struct Component {
977    denom: asset::Id,
978    min_liquidity: f64,
979    ignore_arb_executions: bool,
980}
981
982impl Component {
983    pub fn new(denom: asset::Id, min_liquidity: f64, ignore_arb_executions: bool) -> Self {
984        Self {
985            denom,
986            min_liquidity,
987            ignore_arb_executions,
988        }
989    }
990
991    async fn record_position_open(
992        &self,
993        dbtx: &mut PgTransaction<'_>,
994        time: DateTime,
995        height: i32,
996        tx_hash: Option<[u8; 32]>,
997        event: &EventPositionOpen,
998    ) -> anyhow::Result<()> {
999        // Get effective prices by orienting the trading function in each direction
1000        let effective_price_1_to_2: f64 = event
1001            .position
1002            .phi
1003            .orient_start(event.trading_pair.asset_1())
1004            .expect("position trading pair matches")
1005            .effective_price()
1006            .into();
1007
1008        let effective_price_2_to_1: f64 = event
1009            .position
1010            .phi
1011            .orient_start(event.trading_pair.asset_2())
1012            .expect("position trading pair matches")
1013            .effective_price()
1014            .into();
1015
1016        // First insert initial reserves and get the rowid
1017        let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1018            "INSERT INTO dex_ex_position_reserves (
1019                position_id,
1020                height,
1021                time,
1022                reserves_1,
1023                reserves_2
1024            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1025        )
1026        .bind(event.position_id.0)
1027        .bind(height)
1028        .bind(time)
1029        .bind(BigDecimal::from(event.reserves_1.value()))
1030        .bind(BigDecimal::from(event.reserves_2.value()))
1031        .fetch_one(dbtx.as_mut())
1032        .await?;
1033
1034        // Then insert position state with the opening_reserves_rowid
1035        sqlx::query(
1036            "INSERT INTO dex_ex_position_state (
1037                position_id,
1038                asset_1,
1039                asset_2,
1040                p,
1041                q,
1042                close_on_fill,
1043                fee_bps,
1044                effective_price_1_to_2,
1045                effective_price_2_to_1,
1046                position_raw,
1047                opening_time,
1048                opening_height,
1049                opening_tx,
1050                opening_reserves_rowid
1051            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1052        )
1053        .bind(event.position_id.0)
1054        .bind(event.trading_pair.asset_1().to_bytes())
1055        .bind(event.trading_pair.asset_2().to_bytes())
1056        .bind(BigDecimal::from(event.position.phi.component.p.value()))
1057        .bind(BigDecimal::from(event.position.phi.component.q.value()))
1058        .bind(event.position.close_on_fill)
1059        .bind(event.trading_fee as i32)
1060        .bind(effective_price_1_to_2)
1061        .bind(effective_price_2_to_1)
1062        .bind(event.position.encode_to_vec())
1063        .bind(time)
1064        .bind(height)
1065        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1066        .bind(opening_reserves_rowid)
1067        .execute(dbtx.as_mut())
1068        .await?;
1069
1070        Ok(())
1071    }
1072
1073    async fn record_swap_execution_traces(
1074        &self,
1075        dbtx: &mut PgTransaction<'_>,
1076        time: DateTime,
1077        height: i32,
1078        swap_execution: &SwapExecution,
1079    ) -> anyhow::Result<()> {
1080        let SwapExecution {
1081            traces,
1082            input: se_input,
1083            output: se_output,
1084        } = swap_execution;
1085
1086        let asset_start = se_input.asset_id;
1087        let asset_end = se_output.asset_id;
1088        let batch_input = se_input.amount;
1089        let batch_output = se_output.amount;
1090
1091        for trace in traces.iter() {
1092            let Some(input_value) = trace.first() else {
1093                continue;
1094            };
1095            let Some(output_value) = trace.last() else {
1096                continue;
1097            };
1098
1099            let input = input_value.amount;
1100            let output = output_value.amount;
1101
1102            let price_float = (output.value() as f64) / (input.value() as f64);
1103            let amount_hops = trace
1104                .iter()
1105                .map(|x| BigDecimal::from(x.amount.value()))
1106                .collect::<Vec<_>>();
1107            let position_id_hops: Vec<[u8; 32]> = vec![];
1108            let asset_hops = trace
1109                .iter()
1110                .map(|x| x.asset_id.to_bytes())
1111                .collect::<Vec<_>>();
1112
1113            sqlx::query(
1114                "INSERT INTO dex_ex_batch_swap_traces (
1115                height,
1116                time,
1117                input,
1118                output,
1119                batch_input,
1120                batch_output,
1121                price_float,
1122                asset_start,
1123                asset_end,
1124                asset_hops,
1125                amount_hops,
1126               position_id_hops
1127            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1128            )
1129            .bind(height)
1130            .bind(time)
1131            .bind(BigDecimal::from(input.value()))
1132            .bind(BigDecimal::from(output.value()))
1133            .bind(BigDecimal::from(batch_input.value()))
1134            .bind(BigDecimal::from(batch_output.value()))
1135            .bind(price_float)
1136            .bind(asset_start.to_bytes())
1137            .bind(asset_end.to_bytes())
1138            .bind(asset_hops)
1139            .bind(amount_hops)
1140            .bind(position_id_hops)
1141            .execute(dbtx.as_mut())
1142            .await?;
1143        }
1144
1145        Ok(())
1146    }
1147
1148    async fn record_block_summary(
1149        &self,
1150        dbtx: &mut PgTransaction<'_>,
1151        time: DateTime,
1152        height: i32,
1153        events: &Events,
1154    ) -> anyhow::Result<()> {
1155        let num_opened_lps = events.position_opens.len() as i32;
1156        let num_closed_lps = events.position_closes.len() as i32;
1157        let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1158        let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1159        let num_swap_claims = events
1160            .swap_claims
1161            .iter()
1162            .map(|(_, v)| v.len())
1163            .sum::<usize>() as i32;
1164        let num_txs = events.batch_swaps.len() as i32;
1165
1166        let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1167
1168        for event in &events.batch_swaps {
1169            let trading_pair = event.batch_swap_output_data.trading_pair;
1170
1171            if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1172                let asset_start = swap_1_2.input.asset_id;
1173                let asset_end = swap_1_2.output.asset_id;
1174                let input = swap_1_2.input.amount;
1175                let output = swap_1_2.output.amount;
1176                let price_float = (output.value() as f64) / (input.value() as f64);
1177
1178                let empty_vec = vec![];
1179                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1180                let filtered_swaps: Vec<_> = swaps_for_pair
1181                    .iter()
1182                    .filter(|swap| swap.delta_1_i != Amount::zero())
1183                    .collect::<Vec<_>>();
1184                let num_swaps = filtered_swaps.len() as i32;
1185
1186                batch_swap_summaries.push(BatchSwapSummary {
1187                    asset_start,
1188                    asset_end,
1189                    input,
1190                    output,
1191                    num_swaps,
1192                    price_float,
1193                });
1194            }
1195
1196            if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1197                let asset_start = swap_2_1.input.asset_id;
1198                let asset_end = swap_2_1.output.asset_id;
1199                let input = swap_2_1.input.amount;
1200                let output = swap_2_1.output.amount;
1201                let price_float = (output.value() as f64) / (input.value() as f64);
1202
1203                let empty_vec = vec![];
1204                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1205                let filtered_swaps: Vec<_> = swaps_for_pair
1206                    .iter()
1207                    .filter(|swap| swap.delta_2_i != Amount::zero())
1208                    .collect::<Vec<_>>();
1209                let num_swaps = filtered_swaps.len() as i32;
1210
1211                batch_swap_summaries.push(BatchSwapSummary {
1212                    asset_start,
1213                    asset_end,
1214                    input,
1215                    output,
1216                    num_swaps,
1217                    price_float,
1218                });
1219            }
1220        }
1221
1222        sqlx::query(
1223            "INSERT INTO dex_ex_block_summary (
1224            height,
1225            time,
1226            batch_swaps,
1227            num_open_lps,
1228            num_closed_lps,
1229            num_withdrawn_lps,
1230            num_swaps,
1231            num_swap_claims,
1232            num_txs
1233        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1234        )
1235        .bind(height)
1236        .bind(time)
1237        .bind(serde_json::to_value(&batch_swap_summaries)?)
1238        .bind(num_opened_lps)
1239        .bind(num_closed_lps)
1240        .bind(num_withdrawn_lps)
1241        .bind(num_swaps)
1242        .bind(num_swap_claims)
1243        .bind(num_txs)
1244        .execute(dbtx.as_mut())
1245        .await?;
1246
1247        Ok(())
1248    }
1249
1250    async fn record_batch_swap_traces(
1251        &self,
1252        dbtx: &mut PgTransaction<'_>,
1253        time: DateTime,
1254        height: i32,
1255        event: &EventBatchSwap,
1256    ) -> anyhow::Result<()> {
1257        let EventBatchSwap {
1258            batch_swap_output_data: _,
1259            swap_execution_1_for_2,
1260            swap_execution_2_for_1,
1261        } = event;
1262
1263        if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1264            self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1265                .await?;
1266        }
1267
1268        if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1269            self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1270                .await?;
1271        }
1272
1273        Ok(())
1274    }
1275
1276    async fn record_position_execution(
1277        &self,
1278        dbtx: &mut PgTransaction<'_>,
1279        time: DateTime,
1280        height: i32,
1281        event: &EventPositionExecution,
1282        positions: &BTreeMap<PositionId, Position>,
1283    ) -> anyhow::Result<()> {
1284        // Get the position that was executed against
1285        let position = positions
1286            .get(&event.position_id)
1287            .expect("position must exist for execution");
1288        let current = Reserves {
1289            r1: event.reserves_1,
1290            r2: event.reserves_2,
1291        };
1292        let prev = Reserves {
1293            r1: event.prev_reserves_1,
1294            r2: event.prev_reserves_2,
1295        };
1296        let flows = Flows::from_phi_and_reserves(&position.phi, &current, &prev);
1297
1298        // First insert the reserves and get the rowid
1299        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1300            "INSERT INTO dex_ex_position_reserves (
1301                position_id,
1302                height,
1303                time,
1304                reserves_1,
1305                reserves_2
1306            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1307        )
1308        .bind(event.position_id.0)
1309        .bind(height)
1310        .bind(time)
1311        .bind(BigDecimal::from(event.reserves_1.value()))
1312        .bind(BigDecimal::from(event.reserves_2.value()))
1313        .fetch_one(dbtx.as_mut())
1314        .await?;
1315
1316        // Then record the execution with the reserves_rowid
1317        sqlx::query(
1318            "INSERT INTO dex_ex_position_executions (
1319                position_id,
1320                height,
1321                time,
1322                reserves_rowid,
1323                delta_1,
1324                delta_2,
1325                lambda_1,
1326                lambda_2,
1327                fee_1,
1328                fee_2,
1329                context_asset_start,
1330                context_asset_end
1331            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1332        )
1333        .bind(event.position_id.0)
1334        .bind(height)
1335        .bind(time)
1336        .bind(reserves_rowid)
1337        .bind(BigDecimal::from(flows.delta_1().value()))
1338        .bind(BigDecimal::from(flows.delta_2().value()))
1339        .bind(BigDecimal::from(flows.lambda_1().value()))
1340        .bind(BigDecimal::from(flows.lambda_2().value()))
1341        .bind(BigDecimal::from(flows.fee_1().value()))
1342        .bind(BigDecimal::from(flows.fee_2().value()))
1343        .bind(event.context.start.to_bytes())
1344        .bind(event.context.end.to_bytes())
1345        .execute(dbtx.as_mut())
1346        .await?;
1347
1348        Ok(())
1349    }
1350
1351    async fn record_position_close(
1352        &self,
1353        dbtx: &mut PgTransaction<'_>,
1354        time: DateTime,
1355        height: i32,
1356        tx_hash: Option<[u8; 32]>,
1357        event: &EventPositionClose,
1358    ) -> anyhow::Result<()> {
1359        sqlx::query(
1360            "UPDATE dex_ex_position_state 
1361             SET closing_time = $1,
1362                 closing_height = $2,
1363                 closing_tx = $3
1364             WHERE position_id = $4",
1365        )
1366        .bind(time)
1367        .bind(height)
1368        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1369        .bind(event.position_id.0)
1370        .execute(dbtx.as_mut())
1371        .await?;
1372
1373        Ok(())
1374    }
1375
1376    async fn record_position_withdraw(
1377        &self,
1378        dbtx: &mut PgTransaction<'_>,
1379        time: DateTime,
1380        height: i32,
1381        tx_hash: Option<[u8; 32]>,
1382        event: &EventPositionWithdraw,
1383    ) -> anyhow::Result<()> {
1384        // First insert the final reserves state (zeros after withdrawal)
1385        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1386            "INSERT INTO dex_ex_position_reserves (
1387                position_id,
1388                height,
1389                time,
1390                reserves_1,
1391                reserves_2
1392            ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values
1393        )
1394        .bind(event.position_id.0)
1395        .bind(height)
1396        .bind(time)
1397        .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal
1398        .fetch_one(dbtx.as_mut())
1399        .await?;
1400
1401        sqlx::query(
1402            "INSERT INTO dex_ex_position_withdrawals (
1403                position_id,
1404                height,
1405                time,
1406                withdrawal_tx,
1407                sequence,
1408                reserves_1,
1409                reserves_2,
1410                reserves_rowid
1411            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1412        )
1413        .bind(event.position_id.0)
1414        .bind(height)
1415        .bind(time)
1416        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1417        .bind(event.sequence as i32)
1418        .bind(BigDecimal::from(event.reserves_1.value()))
1419        .bind(BigDecimal::from(event.reserves_2.value()))
1420        .bind(reserves_rowid)
1421        .execute(dbtx.as_mut())
1422        .await?;
1423
1424        Ok(())
1425    }
1426
1427    async fn record_transaction(
1428        &self,
1429        dbtx: &mut PgTransaction<'_>,
1430        time: DateTime,
1431        height: u64,
1432        transaction_id: [u8; 32],
1433        transaction: Transaction,
1434    ) -> anyhow::Result<()> {
1435        if transaction.transaction_body.actions.is_empty() {
1436            return Ok(());
1437        }
1438        sqlx::query(
1439            "INSERT INTO dex_ex_transactions (
1440                transaction_id,
1441                transaction,
1442                height,
1443                time
1444            ) VALUES ($1, $2, $3, $4)
1445            ",
1446        )
1447        .bind(transaction_id)
1448        .bind(transaction.encode_to_vec())
1449        .bind(i32::try_from(height)?)
1450        .bind(time)
1451        .execute(dbtx.as_mut())
1452        .await?;
1453
1454        Ok(())
1455    }
1456
1457    async fn record_all_transactions(
1458        &self,
1459        dbtx: &mut PgTransaction<'_>,
1460        time: DateTime,
1461        block: &BlockEvents,
1462    ) -> anyhow::Result<()> {
1463        for (tx_id, tx_bytes) in block.transactions() {
1464            let tx = Transaction::try_from(tx_bytes)?;
1465            let height = block.height();
1466            self.record_transaction(dbtx, time, height, tx_id, tx)
1467                .await?;
1468        }
1469        Ok(())
1470    }
1471}
1472
1473#[async_trait]
1474impl AppView for Component {
1475    async fn init_chain(
1476        &self,
1477        _dbtx: &mut PgTransaction,
1478        _: &serde_json::Value,
1479    ) -> Result<(), anyhow::Error> {
1480        Ok(())
1481    }
1482
1483    fn name(&self) -> String {
1484        "dex_ex".to_string()
1485    }
1486
1487    fn version(&self) -> Version {
1488        let hash: [u8; 32] = blake2b_simd::Params::default()
1489            .personal(b"option_hash")
1490            .hash_length(32)
1491            .to_state()
1492            .update(&self.denom.to_bytes())
1493            .update(&self.min_liquidity.to_le_bytes())
1494            .update(&[u8::from(self.ignore_arb_executions)])
1495            .finalize()
1496            .as_bytes()
1497            .try_into()
1498            .expect("Impossible 000-001: expected 32 byte hash");
1499        Version::with_major(2).with_option_hash(hash)
1500    }
1501
1502    async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1503        for statement in include_str!("reset.sql").split(";") {
1504            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1505        }
1506        Ok(())
1507    }
1508
1509    async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1510        for statement in include_str!("schema.sql").split(";") {
1511            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1512        }
1513        Ok(())
1514    }
1515
1516    async fn index_batch(
1517        &self,
1518        dbtx: &mut PgTransaction,
1519        batch: EventBatch,
1520        ctx: EventBatchContext,
1521    ) -> Result<(), anyhow::Error> {
1522        metadata::set(dbtx, self.denom).await?;
1523        let mut charts = HashMap::new();
1524        let mut snapshots = HashMap::new();
1525        let mut last_time = None;
1526        for block in batch.events_by_block() {
1527            let mut events = Events::extract(&block, self.ignore_arb_executions)?;
1528            let time = events
1529                .time
1530                .expect(&format!("no block root event at height {}", block.height()));
1531            last_time = Some(time);
1532
1533            self.record_all_transactions(dbtx, time, block).await?;
1534
1535            // Load any missing positions before processing events
1536            events.load_positions(dbtx).await?;
1537
1538            // This is where we are going to build the block summary for the DEX.
1539            self.record_block_summary(dbtx, time, block.height() as i32, &events)
1540                .await?;
1541
1542            // Record batch swap execution traces.
1543            for event in &events.batch_swaps {
1544                self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1545                    .await?;
1546            }
1547
1548            // Record position opens
1549            for event in &events.position_opens {
1550                let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1551                self.record_position_open(dbtx, time, events.height, tx_hash, event)
1552                    .await?;
1553            }
1554
1555            // Process position executions
1556            for event in &events.position_executions {
1557                self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1558                    .await?;
1559            }
1560
1561            // Record position closes
1562            for event in &events.position_closes {
1563                let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1564                self.record_position_close(dbtx, time, events.height, tx_hash, event)
1565                    .await?;
1566            }
1567
1568            // Record position withdrawals
1569            for event in &events.position_withdrawals {
1570                let tx_hash = events
1571                    .position_withdrawal_txs
1572                    .get(&event.position_id)
1573                    .copied();
1574                self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1575                    .await?;
1576            }
1577
1578            for (pair, candle) in &events.candles {
1579                sqlx::query(
1580                    "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1581                )
1582                .bind(i64::try_from(block.height())?)
1583                .bind(time)
1584                .bind(pair.start.to_bytes())
1585                .bind(pair.end.to_bytes())
1586                .bind(candle.open)
1587                .bind(candle.close)
1588                .bind(candle.low)
1589                .bind(candle.high)
1590                .bind(candle.direct_volume)
1591                .bind(candle.swap_volume)
1592                .execute(dbtx.as_mut())
1593                .await?;
1594                for window in Window::all() {
1595                    let key = (pair.start, pair.end, window);
1596                    if !charts.contains_key(&key) {
1597                        let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1598                        charts.insert(key, ctx);
1599                    }
1600                    charts
1601                        .get_mut(&key)
1602                        .unwrap() // safe because we just inserted above
1603                        .update(dbtx, time, *candle)
1604                        .await?;
1605                }
1606            }
1607
1608            let block_pairs = events
1609                .candles
1610                .keys()
1611                .chain(events.metrics.keys())
1612                .copied()
1613                .collect::<HashSet<_>>();
1614            for pair in block_pairs {
1615                if !snapshots.contains_key(&pair) {
1616                    let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1617                    snapshots.insert(pair, ctx);
1618                }
1619                // NOPANIC: inserted above
1620                snapshots
1621                    .get_mut(&pair)
1622                    .unwrap()
1623                    .update(
1624                        dbtx,
1625                        time,
1626                        events.candles.get(&pair).copied(),
1627                        events.metrics.get(&pair).copied().unwrap_or_default(),
1628                        events.price_for(self.denom, pair.start),
1629                    )
1630                    .await?;
1631            }
1632        }
1633
1634        if ctx.is_last() {
1635            if let Some(now) = last_time {
1636                for window in Window::all() {
1637                    summary::update_summaries(dbtx, now, window).await?;
1638                    summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1639                        .await?;
1640                }
1641            }
1642        }
1643        for chart in charts.into_values() {
1644            chart.unload(dbtx).await?;
1645        }
1646        Ok(())
1647    }
1648}