pindexer/dex_ex/
mod.rs

1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4    async_trait,
5    index::{BlockEvents, EventBatch, EventBatchContext},
6    AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10    event::{
11        EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
12        EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13    },
14    lp::{position::Flows, Reserves},
15    DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18    event::{EventSwap, EventSwapClaim},
19    lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34    use super::DateTime;
35    use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36    use penumbra_sdk_dex::CandlestickData;
37    use std::fmt::Display;
38
39    fn geo_mean(a: f64, b: f64) -> f64 {
40        (a * b).sqrt()
41    }
42
43    /// Candlestick data, unmoored from the prison of a particular block height.
44    ///
45    /// In other words, this can represent candlesticks which span arbitrary windows,
46    /// and not just a single block.
47    #[derive(Debug, Clone, Copy)]
48    pub struct Candle {
49        pub open: f64,
50        pub close: f64,
51        pub low: f64,
52        pub high: f64,
53        pub direct_volume: f64,
54        pub swap_volume: f64,
55    }
56
57    impl Candle {
58        pub fn from_candlestick_data(data: &CandlestickData) -> Self {
59            // The volume is tracked in terms of input asset.
60            // We can use the closing price (aka. the common clearing price) to convert
61            // the volume to the other direction i.e, the batch swap output.
62            Self {
63                open: data.open,
64                close: data.close,
65                low: data.low,
66                high: data.high,
67                direct_volume: data.direct_volume,
68                swap_volume: data.swap_volume,
69            }
70        }
71
72        pub fn merge(&mut self, that: &Self) {
73            self.close = that.close;
74            self.low = self.low.min(that.low);
75            self.high = self.high.max(that.high);
76            self.direct_volume += that.direct_volume;
77            self.swap_volume += that.swap_volume;
78        }
79
80        /// Mix this candle with a candle going in the opposite direction of the pair.
81        pub fn mix(&mut self, op: &Self) {
82            // We use the geometric mean, resulting in all the prices in a.mix(b) being
83            // the inverse of the prices in b.mix(a), and the volumes being equal.
84            self.close /= geo_mean(self.close, op.close);
85            self.open /= geo_mean(self.open, op.open);
86            self.low = self.low.min(1.0 / op.low);
87            self.high = self.high.min(1.0 / op.high);
88            // Using the closing price to look backwards at volume.
89            self.direct_volume += op.direct_volume / self.close;
90            self.swap_volume += op.swap_volume / self.close;
91        }
92
93        /// Flip this candle to get the equivalent in the other direction.
94        pub fn flip(&self) -> Self {
95            Self {
96                open: 1.0 / self.open,
97                close: 1.0 / self.close,
98                low: 1.0 / self.low,
99                high: 1.0 / self.high,
100                direct_volume: self.direct_volume * self.close,
101                swap_volume: self.swap_volume * self.close,
102            }
103        }
104    }
105
106    impl From<CandlestickData> for Candle {
107        fn from(value: CandlestickData) -> Self {
108            Self::from(&value)
109        }
110    }
111
112    impl From<&CandlestickData> for Candle {
113        fn from(value: &CandlestickData) -> Self {
114            Self::from_candlestick_data(value)
115        }
116    }
117
118    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
119    pub enum Window {
120        W1m,
121        W15m,
122        W1h,
123        W4h,
124        W1d,
125        W1w,
126        W1mo,
127    }
128
129    impl Window {
130        pub fn all() -> impl Iterator<Item = Self> {
131            [
132                Window::W1m,
133                Window::W15m,
134                Window::W1h,
135                Window::W4h,
136                Window::W1d,
137                Window::W1w,
138                Window::W1mo,
139            ]
140            .into_iter()
141        }
142
143        /// Get the anchor for a given time.
144        ///
145        /// This is the latest time that "snaps" to a given anchor, dependent on the window.
146        ///
147        /// For example, the 1 minute window has an anchor every minute, the day window
148        /// every day, etc.
149        pub fn anchor(&self, time: DateTime) -> DateTime {
150            let (y, mo, d, h, m) = (
151                time.year(),
152                time.month(),
153                time.day(),
154                time.hour(),
155                time.minute(),
156            );
157            let out = match self {
158                Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
159                Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
160                Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
161                Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
162                Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
163                Window::W1w => Utc
164                    .with_ymd_and_hms(y, mo, d, 0, 0, 0)
165                    .single()
166                    .and_then(|x| {
167                        x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
168                    }),
169                Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
170            };
171            out.unwrap()
172        }
173
174        pub fn subtract_from(&self, time: DateTime) -> DateTime {
175            let delta = match self {
176                Window::W1m => TimeDelta::minutes(1),
177                Window::W15m => TimeDelta::minutes(15),
178                Window::W1h => TimeDelta::hours(1),
179                Window::W4h => TimeDelta::hours(4),
180                Window::W1d => TimeDelta::days(1),
181                Window::W1w => TimeDelta::weeks(1),
182                Window::W1mo => TimeDelta::days(30),
183            };
184            time - delta
185        }
186    }
187
188    impl Display for Window {
189        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
190            use Window::*;
191            let str = match self {
192                W1m => "1m",
193                W15m => "15m",
194                W1h => "1h",
195                W4h => "4h",
196                W1d => "1d",
197                W1w => "1w",
198                W1mo => "1mo",
199            };
200            write!(f, "{}", str)
201        }
202    }
203
204    #[derive(Debug)]
205    pub struct WindowedCandle {
206        start: DateTime,
207        window: Window,
208        candle: Candle,
209    }
210
211    impl WindowedCandle {
212        pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
213            Self {
214                start: window.anchor(now),
215                window,
216                candle,
217            }
218        }
219
220        /// Update with a new candlestick, at a given time.
221        ///
222        /// This may return the old candlestick and its start time, if we should be starting
223        /// a new candle, based on the candle for that window having already been closed.
224        pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
225            let start = self.window.anchor(now);
226            // This candle belongs to the next window!
227            if start > self.start {
228                let old_start = std::mem::replace(&mut self.start, start);
229                let old_candle = std::mem::replace(&mut self.candle, candle);
230                Some((old_start, old_candle))
231            } else {
232                self.candle.merge(&candle);
233                None
234            }
235        }
236
237        pub fn window(&self) -> Window {
238            self.window
239        }
240
241        pub fn flush(self) -> (DateTime, Candle) {
242            (self.start, self.candle)
243        }
244    }
245}
246pub use candle::{Candle, Window, WindowedCandle};
247
248mod price_chart {
249    use super::*;
250
251    /// A context when processing a price chart.
252    #[derive(Debug)]
253    pub struct Context {
254        asset_start: asset::Id,
255        asset_end: asset::Id,
256        window: Window,
257        state: Option<WindowedCandle>,
258    }
259
260    impl Context {
261        pub async fn load(
262            dbtx: &mut PgTransaction<'_>,
263            asset_start: asset::Id,
264            asset_end: asset::Id,
265            window: Window,
266        ) -> anyhow::Result<Self> {
267            let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
268                "
269                SELECT open, close, high, low, direct_volume, swap_volume, start_time
270                FROM dex_ex_price_charts
271                WHERE asset_start = $1
272                AND asset_end = $2
273                AND the_window = $3
274                ORDER BY start_time DESC
275                LIMIT 1
276            ",
277            )
278            .bind(asset_start.to_bytes())
279            .bind(asset_end.to_bytes())
280            .bind(window.to_string())
281            .fetch_optional(dbtx.as_mut())
282            .await?;
283            let state = row.map(
284                |(open, close, low, high, direct_volume, swap_volume, start)| {
285                    let candle = Candle {
286                        open,
287                        close,
288                        low,
289                        high,
290                        direct_volume,
291                        swap_volume,
292                    };
293                    WindowedCandle::new(start, window, candle)
294                },
295            );
296            Ok(Self {
297                asset_start,
298                asset_end,
299                window,
300                state,
301            })
302        }
303
304        async fn write_candle(
305            &self,
306            dbtx: &mut PgTransaction<'_>,
307            start: DateTime,
308            candle: Candle,
309        ) -> anyhow::Result<()> {
310            sqlx::query(
311                "
312            INSERT INTO dex_ex_price_charts(
313                id, asset_start, asset_end, the_window,
314                start_time, open, close, high, low, direct_volume, swap_volume
315            ) 
316            VALUES(
317                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
318            )
319            ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
320                open = EXCLUDED.open,
321                close = EXCLUDED.close,
322                high = EXCLUDED.high,
323                low = EXCLUDED.low,
324                direct_volume = EXCLUDED.direct_volume,
325                swap_volume = EXCLUDED.swap_volume
326            ",
327            )
328            .bind(self.asset_start.to_bytes())
329            .bind(self.asset_end.to_bytes())
330            .bind(self.window.to_string())
331            .bind(start)
332            .bind(candle.open)
333            .bind(candle.close)
334            .bind(candle.high)
335            .bind(candle.low)
336            .bind(candle.direct_volume)
337            .bind(candle.swap_volume)
338            .execute(dbtx.as_mut())
339            .await?;
340            Ok(())
341        }
342
343        pub async fn update(
344            &mut self,
345            dbtx: &mut PgTransaction<'_>,
346            now: DateTime,
347            candle: Candle,
348        ) -> anyhow::Result<()> {
349            let state = match self.state.as_mut() {
350                Some(x) => x,
351                None => {
352                    self.state = Some(WindowedCandle::new(now, self.window, candle));
353                    self.state.as_mut().unwrap()
354                }
355            };
356            if let Some((start, old_candle)) = state.with_candle(now, candle) {
357                self.write_candle(dbtx, start, old_candle).await?;
358            };
359            Ok(())
360        }
361
362        pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
363            let state = std::mem::replace(&mut self.state, None);
364            if let Some(state) = state {
365                let (start, candle) = state.flush();
366                self.write_candle(dbtx, start, candle).await?;
367            }
368            Ok(())
369        }
370    }
371}
372
373use price_chart::Context as PriceChartContext;
374
375mod summary {
376    use cometindex::PgTransaction;
377    use penumbra_sdk_asset::asset;
378
379    use super::{Candle, DateTime, PairMetrics, Window};
380
381    pub struct Context {
382        start: asset::Id,
383        end: asset::Id,
384        price: f64,
385        liquidity: f64,
386        start_price_indexing_denom: f64,
387    }
388
389    impl Context {
390        pub async fn load(
391            dbtx: &mut PgTransaction<'_>,
392            start: asset::Id,
393            end: asset::Id,
394        ) -> anyhow::Result<Self> {
395            let row: Option<(f64, f64, f64)> = sqlx::query_as(
396                "
397                SELECT price, liquidity, start_price_indexing_denom
398                FROM dex_ex_pairs_block_snapshot
399                WHERE asset_start = $1
400                AND asset_end = $2
401                ORDER BY id DESC
402                LIMIT 1
403            ",
404            )
405            .bind(start.to_bytes())
406            .bind(end.to_bytes())
407            .fetch_optional(dbtx.as_mut())
408            .await?;
409            let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
410            Ok(Self {
411                start,
412                end,
413                price,
414                liquidity,
415                start_price_indexing_denom,
416            })
417        }
418
419        pub async fn update(
420            &mut self,
421            dbtx: &mut PgTransaction<'_>,
422            now: DateTime,
423            candle: Option<Candle>,
424            metrics: PairMetrics,
425            start_price_indexing_denom: Option<f64>,
426        ) -> anyhow::Result<()> {
427            if let Some(candle) = candle {
428                self.price = candle.close;
429            }
430            if let Some(price) = start_price_indexing_denom {
431                self.start_price_indexing_denom = price;
432            }
433            self.liquidity += metrics.liquidity_change;
434
435            sqlx::query(
436                "
437            INSERT INTO dex_ex_pairs_block_snapshot VALUES (
438                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
439            )        
440        ",
441            )
442            .bind(now)
443            .bind(self.start.to_bytes())
444            .bind(self.end.to_bytes())
445            .bind(self.price)
446            .bind(self.liquidity)
447            .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
448            .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
449            .bind(self.start_price_indexing_denom)
450            .bind(metrics.trades)
451            .execute(dbtx.as_mut())
452            .await?;
453            Ok(())
454        }
455    }
456
457    pub async fn update_summaries(
458        dbtx: &mut PgTransaction<'_>,
459        now: DateTime,
460        window: Window,
461    ) -> anyhow::Result<()> {
462        let then = window.subtract_from(now);
463        let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
464            "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
465        )
466        .fetch_all(dbtx.as_mut())
467        .await?;
468        for (start, end) in pairs {
469            sqlx::query(
470                "
471            WITH
472            snapshots AS (
473                SELECT *
474                FROM dex_ex_pairs_block_snapshot
475                WHERE asset_start = $1
476                AND asset_end = $2
477            ),
478            previous AS (
479                SELECT price AS price_then, liquidity AS liquidity_then
480                FROM snapshots
481                WHERE time <= $4
482                ORDER BY time DESC
483                LIMIT 1
484            ),
485            previous_or_default AS (
486                SELECT
487                    COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
488                    COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
489            ),
490            now AS (
491                SELECT price, liquidity            
492                FROM snapshots
493                WHERE time <= $3
494                ORDER BY time DESC
495                LIMIT 1
496            ),
497            sums AS (
498                SELECT
499                    COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
500                    COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
501                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
502                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
503                    COALESCE(SUM(trades), 0.0) AS trades_over_window,
504                    COALESCE(MIN(price), 0.0) AS low,
505                    COALESCE(MAX(price), 0.0) AS high
506                FROM snapshots
507                WHERE time <= $3
508                AND time >= $4
509            )
510            INSERT INTO dex_ex_pairs_summary
511            SELECT 
512                $1, $2, $5,
513                price, price_then,
514                low, high,
515                liquidity, liquidity_then,
516                direct_volume_over_window,
517                swap_volume_over_window,
518                direct_volume_indexing_denom_over_window,
519                swap_volume_indexing_denom_over_window,
520                trades_over_window
521            FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
522            ON CONFLICT (asset_start, asset_end, the_window)
523            DO UPDATE SET
524                price = EXCLUDED.price,
525                price_then = EXCLUDED.price_then,
526                liquidity = EXCLUDED.liquidity,
527                liquidity_then = EXCLUDED.liquidity_then,
528                direct_volume_over_window = EXCLUDED.direct_volume_over_window,
529                swap_volume_over_window = EXCLUDED.swap_volume_over_window,
530                direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
531                swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
532                trades_over_window = EXCLUDED.trades_over_window
533            ",
534            )
535            .bind(start)
536            .bind(end)
537            .bind(now)
538            .bind(then)
539            .bind(window.to_string())
540            .execute(dbtx.as_mut())
541            .await?;
542        }
543        Ok(())
544    }
545
546    pub async fn update_aggregate_summary(
547        dbtx: &mut PgTransaction<'_>,
548        window: Window,
549        denom: asset::Id,
550        min_liquidity: f64,
551    ) -> anyhow::Result<()> {
552        // TODO: do something here
553        sqlx::query(
554            "
555        WITH       
556            eligible_denoms AS (
557                SELECT asset_start as asset, price
558                FROM dex_ex_pairs_summary
559                WHERE asset_end = $1
560                AND liquidity >= $2
561                UNION VALUES ($1, 1.0)
562            ),
563            converted_pairs_summary AS (
564                SELECT
565                    asset_start, asset_end,
566                    (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
567                    liquidity * ed_end.price AS liquidity,
568                    direct_volume_indexing_denom_over_window AS dv,
569                    swap_volume_indexing_denom_over_window AS sv,
570                    trades_over_window as trades
571                FROM dex_ex_pairs_summary
572                JOIN eligible_denoms AS ed_end
573                ON ed_end.asset = asset_end
574                JOIN eligible_denoms AS ed_start
575                ON ed_start.asset = asset_start
576                WHERE the_window = $3
577            ),
578            sums AS (
579                SELECT
580                    SUM(dv) AS direct_volume,
581                    SUM(sv) AS swap_volume,
582                    SUM(liquidity) AS liquidity,
583                    SUM(trades) AS trades,
584                    (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs
585                FROM converted_pairs_summary
586            ),
587            largest_sv AS (
588                SELECT
589                    asset_start AS largest_sv_trading_pair_start,
590                    asset_end AS largest_sv_trading_pair_end,
591                    sv AS largest_sv_trading_pair_volume                    
592                FROM converted_pairs_summary
593                ORDER BY sv DESC
594                LIMIT 1
595            ),
596            largest_dv AS (
597                SELECT
598                    asset_start AS largest_dv_trading_pair_start,
599                    asset_end AS largest_dv_trading_pair_end,
600                    dv AS largest_dv_trading_pair_volume                    
601                FROM converted_pairs_summary
602                ORDER BY dv DESC
603                LIMIT 1
604            ),
605            top_price_mover AS (
606                SELECT
607                    asset_start AS top_price_mover_start,
608                    asset_end AS top_price_mover_end,
609                    price_change AS top_price_mover_change_percent
610                FROM converted_pairs_summary
611                ORDER BY price_change DESC
612                LIMIT 1
613            )
614        INSERT INTO dex_ex_aggregate_summary
615        SELECT
616            $3,
617            direct_volume, swap_volume, liquidity, trades, active_pairs,
618            largest_sv_trading_pair_start,
619            largest_sv_trading_pair_end,
620            largest_sv_trading_pair_volume,
621            largest_dv_trading_pair_start,
622            largest_dv_trading_pair_end,
623            largest_dv_trading_pair_volume,
624            top_price_mover_start,
625            top_price_mover_end,
626            top_price_mover_change_percent
627        FROM
628            sums
629        JOIN largest_sv ON TRUE
630        JOIN largest_dv ON TRUE
631        JOIN top_price_mover ON TRUE
632        ON CONFLICT (the_window) DO UPDATE SET
633            direct_volume = EXCLUDED.direct_volume,
634            swap_volume = EXCLUDED.swap_volume,
635            liquidity = EXCLUDED.liquidity,
636            trades = EXCLUDED.trades,
637            active_pairs = EXCLUDED.active_pairs,          
638            largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
639            largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
640            largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
641            largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
642            largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
643            largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
644            top_price_mover_start = EXCLUDED.top_price_mover_start,
645            top_price_mover_end = EXCLUDED.top_price_mover_end,
646            top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
647        ")
648        .bind(denom.to_bytes())
649        .bind(min_liquidity)
650        .bind(window.to_string())
651        .execute(dbtx.as_mut())
652        .await?;
653        Ok(())
654    }
655}
656
657mod metadata {
658    use super::*;
659
660    pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
661        sqlx::query(
662            "
663        INSERT INTO dex_ex_metadata
664        VALUES (1, $1)
665        ON CONFLICT (id) DO UPDATE 
666        SET id = EXCLUDED.id,
667            quote_asset_id = EXCLUDED.quote_asset_id
668        ",
669        )
670        .bind(quote_asset.to_bytes())
671        .execute(dbtx.as_mut())
672        .await?;
673        Ok(())
674    }
675}
676
677#[derive(Debug, Default, Clone, Copy)]
678struct PairMetrics {
679    trades: f64,
680    liquidity_change: f64,
681}
682
683#[derive(Debug, Clone, serde::Serialize)]
684struct BatchSwapSummary {
685    asset_start: asset::Id,
686    asset_end: asset::Id,
687    input: Amount,
688    output: Amount,
689    num_swaps: i32,
690    price_float: f64,
691}
692
693#[derive(Debug)]
694struct Events {
695    time: Option<DateTime>,
696    height: i32,
697    candles: HashMap<DirectedTradingPair, Candle>,
698    metrics: HashMap<DirectedTradingPair, PairMetrics>,
699    // Relevant positions.
700    positions: BTreeMap<PositionId, Position>,
701    // Store events
702    position_opens: Vec<EventPositionOpen>,
703    position_executions: Vec<EventPositionExecution>,
704    position_closes: Vec<EventPositionClose>,
705    position_withdrawals: Vec<EventPositionWithdraw>,
706    batch_swaps: Vec<EventBatchSwap>,
707    swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
708    swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
709    // Track transaction hashes by position ID
710    position_open_txs: BTreeMap<PositionId, [u8; 32]>,
711    position_close_txs: BTreeMap<PositionId, [u8; 32]>,
712    position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
713}
714
715impl Events {
716    fn new() -> Self {
717        Self {
718            time: None,
719            height: 0,
720            candles: HashMap::new(),
721            metrics: HashMap::new(),
722            positions: BTreeMap::new(),
723            position_opens: Vec::new(),
724            position_executions: Vec::new(),
725            position_closes: Vec::new(),
726            position_withdrawals: Vec::new(),
727            batch_swaps: Vec::new(),
728            swaps: BTreeMap::new(),
729            swap_claims: BTreeMap::new(),
730            position_open_txs: BTreeMap::new(),
731            position_close_txs: BTreeMap::new(),
732            position_withdrawal_txs: BTreeMap::new(),
733        }
734    }
735
736    fn with_time(&mut self, time: DateTime) {
737        self.time = Some(time)
738    }
739
740    fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) {
741        // Populate both this pair and the flipped pair, and if the flipped pair
742        // is already populated, we need to mix the two candles together.
743        let flip = pair.flip();
744        let new_candle = match self.candles.get(&flip).cloned() {
745            None => candle,
746            Some(flipped) => {
747                let mut out = candle;
748                out.mix(&flipped);
749                out
750            }
751        };
752
753        self.candles.insert(pair, new_candle);
754        self.candles.insert(flip, new_candle.flip());
755    }
756
757    fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
758        if !self.metrics.contains_key(pair) {
759            self.metrics.insert(*pair, PairMetrics::default());
760        }
761        // NOPANIC: inserted above.
762        self.metrics.get_mut(pair).unwrap()
763    }
764
765    fn with_trade(&mut self, pair: &DirectedTradingPair) {
766        self.metric(pair).trades += 1.0;
767    }
768
769    fn with_reserve_change(
770        &mut self,
771        pair: &TradingPair,
772        old_reserves: Option<Reserves>,
773        new_reserves: Reserves,
774        removed: bool,
775    ) {
776        let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
777            (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
778            (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
779            (_, Some(old), new) => (
780                (new.r1.value() as f64) - (old.r1.value() as f64),
781                (new.r2.value() as f64) - (old.r2.value() as f64),
782            ),
783        };
784        for (d_pair, diff) in [
785            (
786                DirectedTradingPair {
787                    start: pair.asset_1(),
788                    end: pair.asset_2(),
789                },
790                diff_2,
791            ),
792            (
793                DirectedTradingPair {
794                    start: pair.asset_2(),
795                    end: pair.asset_1(),
796                },
797                diff_1,
798            ),
799        ] {
800            self.metric(&d_pair).liquidity_change += diff;
801        }
802    }
803
804    pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
805        let mut out = Self::new();
806        out.height = block.height() as i32;
807
808        for event in block.events() {
809            if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
810                let candle = Candle::from_candlestick_data(&e.stick);
811                out.with_candle(e.pair, candle);
812            } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
813                let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
814                    "creating timestamp should succeed; timestamp: {}",
815                    e.timestamp_seconds
816                ))?;
817                out.with_time(time);
818            } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
819                out.with_reserve_change(
820                    &e.trading_pair,
821                    None,
822                    Reserves {
823                        r1: e.reserves_1,
824                        r2: e.reserves_2,
825                    },
826                    false,
827                );
828                if let Some(tx_hash) = event.tx_hash() {
829                    out.position_open_txs.insert(e.position_id, tx_hash);
830                }
831                // A newly opened position might be executed against in this block,
832                // but wouldn't already be in the database. Adding it here ensures
833                // it's available.
834                out.positions.insert(e.position_id, e.position.clone());
835                out.position_opens.push(e);
836            } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
837                // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few
838                // positions to close with being withdrawn.
839                out.with_reserve_change(
840                    &e.trading_pair,
841                    None,
842                    Reserves {
843                        r1: e.reserves_1,
844                        r2: e.reserves_2,
845                    },
846                    true,
847                );
848                if let Some(tx_hash) = event.tx_hash() {
849                    out.position_withdrawal_txs.insert(e.position_id, tx_hash);
850                }
851                out.position_withdrawals.push(e);
852            } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
853                out.with_reserve_change(
854                    &e.trading_pair,
855                    Some(Reserves {
856                        r1: e.prev_reserves_1,
857                        r2: e.prev_reserves_2,
858                    }),
859                    Reserves {
860                        r1: e.reserves_1,
861                        r2: e.reserves_2,
862                    },
863                    false,
864                );
865                if e.reserves_1 > e.prev_reserves_1 {
866                    // Whatever asset we ended up with more with was traded in.
867                    out.with_trade(&DirectedTradingPair {
868                        start: e.trading_pair.asset_1(),
869                        end: e.trading_pair.asset_2(),
870                    });
871                } else if e.reserves_2 > e.prev_reserves_2 {
872                    out.with_trade(&DirectedTradingPair {
873                        start: e.trading_pair.asset_2(),
874                        end: e.trading_pair.asset_1(),
875                    });
876                }
877                out.position_executions.push(e);
878            } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
879                out.position_closes.push(e);
880            } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
881                // The position close event is emitted by the dex module at EOB,
882                // so we need to track it with the tx hash of the closure tx.
883                if let Some(tx_hash) = event.tx_hash() {
884                    out.position_close_txs.insert(e.position_id, tx_hash);
885                }
886            } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
887                out.batch_swaps.push(e);
888            } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
889                out.swaps
890                    .entry(e.trading_pair)
891                    .or_insert_with(Vec::new)
892                    .push(e);
893            } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
894                out.swap_claims
895                    .entry(e.trading_pair)
896                    .or_insert_with(Vec::new)
897                    .push(e);
898            } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
899                let pair = DirectedTradingPair {
900                    start: e.incentivized_asset_id,
901                    end: *STAKING_TOKEN_ASSET_ID,
902                };
903                out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
904            }
905        }
906        Ok(out)
907    }
908
909    async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
910        // Collect position IDs that we need but don't already have
911        let missing_positions: Vec<_> = self
912            .position_executions
913            .iter()
914            .map(|e| e.position_id)
915            .filter(|id| !self.positions.contains_key(id))
916            .collect();
917
918        if missing_positions.is_empty() {
919            return Ok(());
920        }
921
922        // Load missing positions from database
923        let rows = sqlx::query(
924            "SELECT position_raw 
925             FROM dex_ex_position_state 
926             WHERE position_id = ANY($1)",
927        )
928        .bind(
929            &missing_positions
930                .iter()
931                .map(|id| id.0.as_ref())
932                .collect::<Vec<_>>(),
933        )
934        .fetch_all(dbtx.as_mut())
935        .await?;
936
937        // Decode and store each position
938        for row in rows {
939            let position_raw: Vec<u8> = row.get("position_raw");
940            let position = Position::decode(position_raw.as_slice())?;
941            self.positions.insert(position.id(), position);
942        }
943
944        Ok(())
945    }
946
947    /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
948    pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
949        self.candles
950            .get(&DirectedTradingPair::new(asset, indexing_denom))
951            .map(|x| x.close)
952    }
953}
954
955#[derive(Debug)]
956pub struct Component {
957    denom: asset::Id,
958    min_liquidity: f64,
959}
960
961impl Component {
962    pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
963        Self {
964            denom,
965            min_liquidity,
966        }
967    }
968
969    async fn record_position_open(
970        &self,
971        dbtx: &mut PgTransaction<'_>,
972        time: DateTime,
973        height: i32,
974        tx_hash: Option<[u8; 32]>,
975        event: &EventPositionOpen,
976    ) -> anyhow::Result<()> {
977        // Get effective prices by orienting the trading function in each direction
978        let effective_price_1_to_2: f64 = event
979            .position
980            .phi
981            .orient_start(event.trading_pair.asset_1())
982            .expect("position trading pair matches")
983            .effective_price()
984            .into();
985
986        let effective_price_2_to_1: f64 = event
987            .position
988            .phi
989            .orient_start(event.trading_pair.asset_2())
990            .expect("position trading pair matches")
991            .effective_price()
992            .into();
993
994        // First insert initial reserves and get the rowid
995        let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
996            "INSERT INTO dex_ex_position_reserves (
997                position_id,
998                height,
999                time,
1000                reserves_1,
1001                reserves_2
1002            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1003        )
1004        .bind(event.position_id.0)
1005        .bind(height)
1006        .bind(time)
1007        .bind(BigDecimal::from(event.reserves_1.value()))
1008        .bind(BigDecimal::from(event.reserves_2.value()))
1009        .fetch_one(dbtx.as_mut())
1010        .await?;
1011
1012        // Then insert position state with the opening_reserves_rowid
1013        sqlx::query(
1014            "INSERT INTO dex_ex_position_state (
1015                position_id,
1016                asset_1,
1017                asset_2,
1018                p,
1019                q,
1020                close_on_fill,
1021                fee_bps,
1022                effective_price_1_to_2,
1023                effective_price_2_to_1,
1024                position_raw,
1025                opening_time,
1026                opening_height,
1027                opening_tx,
1028                opening_reserves_rowid
1029            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1030        )
1031        .bind(event.position_id.0)
1032        .bind(event.trading_pair.asset_1().to_bytes())
1033        .bind(event.trading_pair.asset_2().to_bytes())
1034        .bind(BigDecimal::from(event.position.phi.component.p.value()))
1035        .bind(BigDecimal::from(event.position.phi.component.q.value()))
1036        .bind(event.position.close_on_fill)
1037        .bind(event.trading_fee as i32)
1038        .bind(effective_price_1_to_2)
1039        .bind(effective_price_2_to_1)
1040        .bind(event.position.encode_to_vec())
1041        .bind(time)
1042        .bind(height)
1043        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1044        .bind(opening_reserves_rowid)
1045        .execute(dbtx.as_mut())
1046        .await?;
1047
1048        Ok(())
1049    }
1050
1051    async fn record_swap_execution_traces(
1052        &self,
1053        dbtx: &mut PgTransaction<'_>,
1054        time: DateTime,
1055        height: i32,
1056        swap_execution: &SwapExecution,
1057    ) -> anyhow::Result<()> {
1058        let SwapExecution {
1059            traces,
1060            input: se_input,
1061            output: se_output,
1062        } = swap_execution;
1063
1064        let asset_start = se_input.asset_id;
1065        let asset_end = se_output.asset_id;
1066        let batch_input = se_input.amount;
1067        let batch_output = se_output.amount;
1068
1069        for trace in traces.iter() {
1070            let Some(input_value) = trace.first() else {
1071                continue;
1072            };
1073            let Some(output_value) = trace.last() else {
1074                continue;
1075            };
1076
1077            let input = input_value.amount;
1078            let output = output_value.amount;
1079
1080            let price_float = (output.value() as f64) / (input.value() as f64);
1081            let amount_hops = trace
1082                .iter()
1083                .map(|x| BigDecimal::from(x.amount.value()))
1084                .collect::<Vec<_>>();
1085            let position_id_hops: Vec<[u8; 32]> = vec![];
1086            let asset_hops = trace
1087                .iter()
1088                .map(|x| x.asset_id.to_bytes())
1089                .collect::<Vec<_>>();
1090
1091            sqlx::query(
1092                "INSERT INTO dex_ex_batch_swap_traces (
1093                height,
1094                time,
1095                input,
1096                output,
1097                batch_input,
1098                batch_output,
1099                price_float,
1100                asset_start,
1101                asset_end,
1102                asset_hops,
1103                amount_hops,
1104               position_id_hops
1105            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1106            )
1107            .bind(height)
1108            .bind(time)
1109            .bind(BigDecimal::from(input.value()))
1110            .bind(BigDecimal::from(output.value()))
1111            .bind(BigDecimal::from(batch_input.value()))
1112            .bind(BigDecimal::from(batch_output.value()))
1113            .bind(price_float)
1114            .bind(asset_start.to_bytes())
1115            .bind(asset_end.to_bytes())
1116            .bind(asset_hops)
1117            .bind(amount_hops)
1118            .bind(position_id_hops)
1119            .execute(dbtx.as_mut())
1120            .await?;
1121        }
1122
1123        Ok(())
1124    }
1125
1126    async fn record_block_summary(
1127        &self,
1128        dbtx: &mut PgTransaction<'_>,
1129        time: DateTime,
1130        height: i32,
1131        events: &Events,
1132    ) -> anyhow::Result<()> {
1133        let num_opened_lps = events.position_opens.len() as i32;
1134        let num_closed_lps = events.position_closes.len() as i32;
1135        let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1136        let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1137        let num_swap_claims = events
1138            .swap_claims
1139            .iter()
1140            .map(|(_, v)| v.len())
1141            .sum::<usize>() as i32;
1142        let num_txs = events.batch_swaps.len() as i32;
1143
1144        let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1145
1146        for event in &events.batch_swaps {
1147            let trading_pair = event.batch_swap_output_data.trading_pair;
1148
1149            if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1150                let asset_start = swap_1_2.input.asset_id;
1151                let asset_end = swap_1_2.output.asset_id;
1152                let input = swap_1_2.input.amount;
1153                let output = swap_1_2.output.amount;
1154                let price_float = (output.value() as f64) / (input.value() as f64);
1155
1156                let empty_vec = vec![];
1157                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1158                let filtered_swaps: Vec<_> = swaps_for_pair
1159                    .iter()
1160                    .filter(|swap| swap.delta_1_i != Amount::zero())
1161                    .collect::<Vec<_>>();
1162                let num_swaps = filtered_swaps.len() as i32;
1163
1164                batch_swap_summaries.push(BatchSwapSummary {
1165                    asset_start,
1166                    asset_end,
1167                    input,
1168                    output,
1169                    num_swaps,
1170                    price_float,
1171                });
1172            }
1173
1174            if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1175                let asset_start = swap_2_1.input.asset_id;
1176                let asset_end = swap_2_1.output.asset_id;
1177                let input = swap_2_1.input.amount;
1178                let output = swap_2_1.output.amount;
1179                let price_float = (output.value() as f64) / (input.value() as f64);
1180
1181                let empty_vec = vec![];
1182                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1183                let filtered_swaps: Vec<_> = swaps_for_pair
1184                    .iter()
1185                    .filter(|swap| swap.delta_2_i != Amount::zero())
1186                    .collect::<Vec<_>>();
1187                let num_swaps = filtered_swaps.len() as i32;
1188
1189                batch_swap_summaries.push(BatchSwapSummary {
1190                    asset_start,
1191                    asset_end,
1192                    input,
1193                    output,
1194                    num_swaps,
1195                    price_float,
1196                });
1197            }
1198        }
1199
1200        sqlx::query(
1201            "INSERT INTO dex_ex_block_summary (
1202            height,
1203            time,
1204            batch_swaps,
1205            num_open_lps,
1206            num_closed_lps,
1207            num_withdrawn_lps,
1208            num_swaps,
1209            num_swap_claims,
1210            num_txs
1211        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1212        )
1213        .bind(height)
1214        .bind(time)
1215        .bind(serde_json::to_value(&batch_swap_summaries)?)
1216        .bind(num_opened_lps)
1217        .bind(num_closed_lps)
1218        .bind(num_withdrawn_lps)
1219        .bind(num_swaps)
1220        .bind(num_swap_claims)
1221        .bind(num_txs)
1222        .execute(dbtx.as_mut())
1223        .await?;
1224
1225        Ok(())
1226    }
1227
1228    async fn record_batch_swap_traces(
1229        &self,
1230        dbtx: &mut PgTransaction<'_>,
1231        time: DateTime,
1232        height: i32,
1233        event: &EventBatchSwap,
1234    ) -> anyhow::Result<()> {
1235        let EventBatchSwap {
1236            batch_swap_output_data: _,
1237            swap_execution_1_for_2,
1238            swap_execution_2_for_1,
1239        } = event;
1240
1241        if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1242            self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1243                .await?;
1244        }
1245
1246        if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1247            self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1248                .await?;
1249        }
1250
1251        Ok(())
1252    }
1253
1254    async fn record_position_execution(
1255        &self,
1256        dbtx: &mut PgTransaction<'_>,
1257        time: DateTime,
1258        height: i32,
1259        event: &EventPositionExecution,
1260        positions: &BTreeMap<PositionId, Position>,
1261    ) -> anyhow::Result<()> {
1262        // Get the position that was executed against
1263        let position = positions
1264            .get(&event.position_id)
1265            .expect("position must exist for execution");
1266        let current = Reserves {
1267            r1: event.reserves_1,
1268            r2: event.reserves_2,
1269        };
1270        let prev = Reserves {
1271            r1: event.prev_reserves_1,
1272            r2: event.prev_reserves_2,
1273        };
1274        let flows = Flows::from_phi_and_reserves(&position.phi, &current, &prev);
1275
1276        // First insert the reserves and get the rowid
1277        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1278            "INSERT INTO dex_ex_position_reserves (
1279                position_id,
1280                height,
1281                time,
1282                reserves_1,
1283                reserves_2
1284            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1285        )
1286        .bind(event.position_id.0)
1287        .bind(height)
1288        .bind(time)
1289        .bind(BigDecimal::from(event.reserves_1.value()))
1290        .bind(BigDecimal::from(event.reserves_2.value()))
1291        .fetch_one(dbtx.as_mut())
1292        .await?;
1293
1294        // Then record the execution with the reserves_rowid
1295        sqlx::query(
1296            "INSERT INTO dex_ex_position_executions (
1297                position_id,
1298                height,
1299                time,
1300                reserves_rowid,
1301                delta_1,
1302                delta_2,
1303                lambda_1,
1304                lambda_2,
1305                fee_1,
1306                fee_2,
1307                context_asset_start,
1308                context_asset_end
1309            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1310        )
1311        .bind(event.position_id.0)
1312        .bind(height)
1313        .bind(time)
1314        .bind(reserves_rowid)
1315        .bind(BigDecimal::from(flows.delta_1().value()))
1316        .bind(BigDecimal::from(flows.delta_2().value()))
1317        .bind(BigDecimal::from(flows.lambda_1().value()))
1318        .bind(BigDecimal::from(flows.lambda_2().value()))
1319        .bind(BigDecimal::from(flows.fee_1().value()))
1320        .bind(BigDecimal::from(flows.fee_2().value()))
1321        .bind(event.context.start.to_bytes())
1322        .bind(event.context.end.to_bytes())
1323        .execute(dbtx.as_mut())
1324        .await?;
1325
1326        Ok(())
1327    }
1328
1329    async fn record_position_close(
1330        &self,
1331        dbtx: &mut PgTransaction<'_>,
1332        time: DateTime,
1333        height: i32,
1334        tx_hash: Option<[u8; 32]>,
1335        event: &EventPositionClose,
1336    ) -> anyhow::Result<()> {
1337        sqlx::query(
1338            "UPDATE dex_ex_position_state 
1339             SET closing_time = $1,
1340                 closing_height = $2,
1341                 closing_tx = $3
1342             WHERE position_id = $4",
1343        )
1344        .bind(time)
1345        .bind(height)
1346        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1347        .bind(event.position_id.0)
1348        .execute(dbtx.as_mut())
1349        .await?;
1350
1351        Ok(())
1352    }
1353
1354    async fn record_position_withdraw(
1355        &self,
1356        dbtx: &mut PgTransaction<'_>,
1357        time: DateTime,
1358        height: i32,
1359        tx_hash: Option<[u8; 32]>,
1360        event: &EventPositionWithdraw,
1361    ) -> anyhow::Result<()> {
1362        // First insert the final reserves state (zeros after withdrawal)
1363        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1364            "INSERT INTO dex_ex_position_reserves (
1365                position_id,
1366                height,
1367                time,
1368                reserves_1,
1369                reserves_2
1370            ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values
1371        )
1372        .bind(event.position_id.0)
1373        .bind(height)
1374        .bind(time)
1375        .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal
1376        .fetch_one(dbtx.as_mut())
1377        .await?;
1378
1379        sqlx::query(
1380            "INSERT INTO dex_ex_position_withdrawals (
1381                position_id,
1382                height,
1383                time,
1384                withdrawal_tx,
1385                sequence,
1386                reserves_1,
1387                reserves_2,
1388                reserves_rowid
1389            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1390        )
1391        .bind(event.position_id.0)
1392        .bind(height)
1393        .bind(time)
1394        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1395        .bind(event.sequence as i32)
1396        .bind(BigDecimal::from(event.reserves_1.value()))
1397        .bind(BigDecimal::from(event.reserves_2.value()))
1398        .bind(reserves_rowid)
1399        .execute(dbtx.as_mut())
1400        .await?;
1401
1402        Ok(())
1403    }
1404
1405    async fn record_transaction(
1406        &self,
1407        dbtx: &mut PgTransaction<'_>,
1408        time: DateTime,
1409        height: u64,
1410        transaction_id: [u8; 32],
1411        transaction: Transaction,
1412    ) -> anyhow::Result<()> {
1413        if transaction.transaction_body.actions.is_empty() {
1414            return Ok(());
1415        }
1416        sqlx::query(
1417            "INSERT INTO dex_ex_transactions (
1418                transaction_id,
1419                transaction,
1420                height,
1421                time
1422            ) VALUES ($1, $2, $3, $4)
1423            ",
1424        )
1425        .bind(transaction_id)
1426        .bind(transaction.encode_to_vec())
1427        .bind(i32::try_from(height)?)
1428        .bind(time)
1429        .execute(dbtx.as_mut())
1430        .await?;
1431
1432        Ok(())
1433    }
1434
1435    async fn record_all_transactions(
1436        &self,
1437        dbtx: &mut PgTransaction<'_>,
1438        time: DateTime,
1439        block: &BlockEvents,
1440    ) -> anyhow::Result<()> {
1441        for (tx_id, tx_bytes) in block.transactions() {
1442            let tx = Transaction::try_from(tx_bytes)?;
1443            let height = block.height();
1444            self.record_transaction(dbtx, time, height, tx_id, tx)
1445                .await?;
1446        }
1447        Ok(())
1448    }
1449}
1450
1451#[async_trait]
1452impl AppView for Component {
1453    async fn init_chain(
1454        &self,
1455        dbtx: &mut PgTransaction,
1456        _: &serde_json::Value,
1457    ) -> Result<(), anyhow::Error> {
1458        for statement in include_str!("schema.sql").split(";") {
1459            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1460        }
1461        Ok(())
1462    }
1463
1464    fn name(&self) -> String {
1465        "dex_ex".to_string()
1466    }
1467
1468    async fn index_batch(
1469        &self,
1470        dbtx: &mut PgTransaction,
1471        batch: EventBatch,
1472        ctx: EventBatchContext,
1473    ) -> Result<(), anyhow::Error> {
1474        metadata::set(dbtx, self.denom).await?;
1475        let mut charts = HashMap::new();
1476        let mut snapshots = HashMap::new();
1477        let mut last_time = None;
1478        for block in batch.events_by_block() {
1479            let mut events = Events::extract(&block)?;
1480            let time = events
1481                .time
1482                .expect(&format!("no block root event at height {}", block.height()));
1483            last_time = Some(time);
1484
1485            self.record_all_transactions(dbtx, time, block).await?;
1486
1487            // Load any missing positions before processing events
1488            events.load_positions(dbtx).await?;
1489
1490            // This is where we are going to build the block summary for the DEX.
1491            self.record_block_summary(dbtx, time, block.height() as i32, &events)
1492                .await?;
1493
1494            // Record batch swap execution traces.
1495            for event in &events.batch_swaps {
1496                self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1497                    .await?;
1498            }
1499
1500            // Record position opens
1501            for event in &events.position_opens {
1502                let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1503                self.record_position_open(dbtx, time, events.height, tx_hash, event)
1504                    .await?;
1505            }
1506
1507            // Process position executions
1508            for event in &events.position_executions {
1509                self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1510                    .await?;
1511            }
1512
1513            // Record position closes
1514            for event in &events.position_closes {
1515                let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1516                self.record_position_close(dbtx, time, events.height, tx_hash, event)
1517                    .await?;
1518            }
1519
1520            // Record position withdrawals
1521            for event in &events.position_withdrawals {
1522                let tx_hash = events
1523                    .position_withdrawal_txs
1524                    .get(&event.position_id)
1525                    .copied();
1526                self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1527                    .await?;
1528            }
1529
1530            for (pair, candle) in &events.candles {
1531                for window in Window::all() {
1532                    let key = (pair.start, pair.end, window);
1533                    if !charts.contains_key(&key) {
1534                        let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1535                        charts.insert(key, ctx);
1536                    }
1537                    charts
1538                        .get_mut(&key)
1539                        .unwrap() // safe because we just inserted above
1540                        .update(dbtx, time, *candle)
1541                        .await?;
1542                }
1543            }
1544
1545            let block_pairs = events
1546                .candles
1547                .keys()
1548                .chain(events.metrics.keys())
1549                .copied()
1550                .collect::<HashSet<_>>();
1551            for pair in block_pairs {
1552                if !snapshots.contains_key(&pair) {
1553                    let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1554                    snapshots.insert(pair, ctx);
1555                }
1556                // NOPANIC: inserted above
1557                snapshots
1558                    .get_mut(&pair)
1559                    .unwrap()
1560                    .update(
1561                        dbtx,
1562                        time,
1563                        events.candles.get(&pair).copied(),
1564                        events.metrics.get(&pair).copied().unwrap_or_default(),
1565                        events.price_for(self.denom, pair.start),
1566                    )
1567                    .await?;
1568            }
1569        }
1570
1571        if ctx.is_last() {
1572            if let Some(now) = last_time {
1573                for window in Window::all() {
1574                    summary::update_summaries(dbtx, now, window).await?;
1575                    summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1576                        .await?;
1577                }
1578            }
1579        }
1580        for chart in charts.into_values() {
1581            chart.unload(dbtx).await?;
1582        }
1583        Ok(())
1584    }
1585}