pindexer/dex_ex/
mod.rs

1use anyhow::anyhow;
2use cometindex::{
3    async_trait,
4    index::{BlockEvents, EventBatch, EventBatchContext},
5    AppView, PgTransaction,
6};
7use penumbra_sdk_asset::asset;
8use penumbra_sdk_dex::{
9    event::{
10        EventBatchSwap, EventCandlestickData, EventPositionClose, EventPositionExecution,
11        EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
12    },
13    lp::Reserves,
14    DirectedTradingPair, SwapExecution, TradingPair,
15};
16use penumbra_sdk_dex::{
17    event::{EventSwap, EventSwapClaim},
18    lp::position::{Id as PositionId, Position},
19};
20use penumbra_sdk_num::Amount;
21use penumbra_sdk_proto::event::EventDomainType;
22use penumbra_sdk_proto::DomainType;
23use penumbra_sdk_sct::event::EventBlockRoot;
24use penumbra_sdk_transaction::Transaction;
25use sqlx::types::BigDecimal;
26use sqlx::Row;
27use std::collections::{BTreeMap, HashMap, HashSet};
28
29type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
30
31mod candle {
32    use super::DateTime;
33    use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
34    use penumbra_sdk_dex::CandlestickData;
35    use std::fmt::Display;
36
37    fn geo_mean(a: f64, b: f64) -> f64 {
38        (a * b).sqrt()
39    }
40
41    /// Candlestick data, unmoored from the prison of a particular block height.
42    ///
43    /// In other words, this can represent candlesticks which span arbitrary windows,
44    /// and not just a single block.
45    #[derive(Debug, Clone, Copy)]
46    pub struct Candle {
47        pub open: f64,
48        pub close: f64,
49        pub low: f64,
50        pub high: f64,
51        pub direct_volume: f64,
52        pub swap_volume: f64,
53    }
54
55    impl Candle {
56        pub fn from_candlestick_data(data: &CandlestickData) -> Self {
57            // The volume is tracked in terms of input asset.
58            // We can use the closing price (aka. the common clearing price) to convert
59            // the volume to the other direction i.e, the batch swap output.
60            Self {
61                open: data.open,
62                close: data.close,
63                low: data.low,
64                high: data.high,
65                direct_volume: data.direct_volume,
66                swap_volume: data.swap_volume,
67            }
68        }
69
70        pub fn merge(&mut self, that: &Self) {
71            self.close = that.close;
72            self.low = self.low.min(that.low);
73            self.high = self.high.max(that.high);
74            self.direct_volume += that.direct_volume;
75            self.swap_volume += that.swap_volume;
76        }
77
78        /// Mix this candle with a candle going in the opposite direction of the pair.
79        pub fn mix(&mut self, op: &Self) {
80            // We use the geometric mean, resulting in all the prices in a.mix(b) being
81            // the inverse of the prices in b.mix(a), and the volumes being equal.
82            self.close /= geo_mean(self.close, op.close);
83            self.open /= geo_mean(self.open, op.open);
84            self.low = self.low.min(1.0 / op.low);
85            self.high = self.high.min(1.0 / op.high);
86            // Using the closing price to look backwards at volume.
87            self.direct_volume += op.direct_volume / self.close;
88            self.swap_volume += op.swap_volume / self.close;
89        }
90
91        /// Flip this candle to get the equivalent in the other direction.
92        pub fn flip(&self) -> Self {
93            Self {
94                open: 1.0 / self.open,
95                close: 1.0 / self.close,
96                low: 1.0 / self.low,
97                high: 1.0 / self.high,
98                direct_volume: self.direct_volume * self.close,
99                swap_volume: self.swap_volume * self.close,
100            }
101        }
102    }
103
104    impl From<CandlestickData> for Candle {
105        fn from(value: CandlestickData) -> Self {
106            Self::from(&value)
107        }
108    }
109
110    impl From<&CandlestickData> for Candle {
111        fn from(value: &CandlestickData) -> Self {
112            Self::from_candlestick_data(value)
113        }
114    }
115
116    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
117    pub enum Window {
118        W1m,
119        W15m,
120        W1h,
121        W4h,
122        W1d,
123        W1w,
124        W1mo,
125    }
126
127    impl Window {
128        pub fn all() -> impl Iterator<Item = Self> {
129            [
130                Window::W1m,
131                Window::W15m,
132                Window::W1h,
133                Window::W4h,
134                Window::W1d,
135                Window::W1w,
136                Window::W1mo,
137            ]
138            .into_iter()
139        }
140
141        /// Get the anchor for a given time.
142        ///
143        /// This is the latest time that "snaps" to a given anchor, dependent on the window.
144        ///
145        /// For example, the 1 minute window has an anchor every minute, the day window
146        /// every day, etc.
147        pub fn anchor(&self, time: DateTime) -> DateTime {
148            let (y, mo, d, h, m) = (
149                time.year(),
150                time.month(),
151                time.day(),
152                time.hour(),
153                time.minute(),
154            );
155            let out = match self {
156                Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
157                Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
158                Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
159                Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
160                Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
161                Window::W1w => Utc
162                    .with_ymd_and_hms(y, mo, d, 0, 0, 0)
163                    .single()
164                    .and_then(|x| {
165                        x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
166                    }),
167                Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
168            };
169            out.unwrap()
170        }
171
172        pub fn subtract_from(&self, time: DateTime) -> DateTime {
173            let delta = match self {
174                Window::W1m => TimeDelta::minutes(1),
175                Window::W15m => TimeDelta::minutes(15),
176                Window::W1h => TimeDelta::hours(1),
177                Window::W4h => TimeDelta::hours(4),
178                Window::W1d => TimeDelta::days(1),
179                Window::W1w => TimeDelta::weeks(1),
180                Window::W1mo => TimeDelta::days(30),
181            };
182            time - delta
183        }
184    }
185
186    impl Display for Window {
187        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
188            use Window::*;
189            let str = match self {
190                W1m => "1m",
191                W15m => "15m",
192                W1h => "1h",
193                W4h => "4h",
194                W1d => "1d",
195                W1w => "1w",
196                W1mo => "1mo",
197            };
198            write!(f, "{}", str)
199        }
200    }
201
202    #[derive(Debug)]
203    pub struct WindowedCandle {
204        start: DateTime,
205        window: Window,
206        candle: Candle,
207    }
208
209    impl WindowedCandle {
210        pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
211            Self {
212                start: window.anchor(now),
213                window,
214                candle,
215            }
216        }
217
218        /// Update with a new candlestick, at a given time.
219        ///
220        /// This may return the old candlestick and its start time, if we should be starting
221        /// a new candle, based on the candle for that window having already been closed.
222        pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
223            let start = self.window.anchor(now);
224            // This candle belongs to the next window!
225            if start > self.start {
226                let old_start = std::mem::replace(&mut self.start, start);
227                let old_candle = std::mem::replace(&mut self.candle, candle);
228                Some((old_start, old_candle))
229            } else {
230                self.candle.merge(&candle);
231                None
232            }
233        }
234
235        pub fn window(&self) -> Window {
236            self.window
237        }
238
239        pub fn flush(self) -> (DateTime, Candle) {
240            (self.start, self.candle)
241        }
242    }
243}
244pub use candle::{Candle, Window, WindowedCandle};
245
246mod price_chart {
247    use super::*;
248
249    /// A context when processing a price chart.
250    #[derive(Debug)]
251    pub struct Context {
252        asset_start: asset::Id,
253        asset_end: asset::Id,
254        window: Window,
255        state: Option<WindowedCandle>,
256    }
257
258    impl Context {
259        pub async fn load(
260            dbtx: &mut PgTransaction<'_>,
261            asset_start: asset::Id,
262            asset_end: asset::Id,
263            window: Window,
264        ) -> anyhow::Result<Self> {
265            let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
266                "
267                SELECT open, close, high, low, direct_volume, swap_volume, start_time
268                FROM dex_ex_price_charts
269                WHERE asset_start = $1
270                AND asset_end = $2
271                AND the_window = $3
272                ORDER BY start_time DESC
273                LIMIT 1
274            ",
275            )
276            .bind(asset_start.to_bytes())
277            .bind(asset_end.to_bytes())
278            .bind(window.to_string())
279            .fetch_optional(dbtx.as_mut())
280            .await?;
281            let state = row.map(
282                |(open, close, low, high, direct_volume, swap_volume, start)| {
283                    let candle = Candle {
284                        open,
285                        close,
286                        low,
287                        high,
288                        direct_volume,
289                        swap_volume,
290                    };
291                    WindowedCandle::new(start, window, candle)
292                },
293            );
294            Ok(Self {
295                asset_start,
296                asset_end,
297                window,
298                state,
299            })
300        }
301
302        async fn write_candle(
303            &self,
304            dbtx: &mut PgTransaction<'_>,
305            start: DateTime,
306            candle: Candle,
307        ) -> anyhow::Result<()> {
308            sqlx::query(
309                "
310            INSERT INTO dex_ex_price_charts(
311                id, asset_start, asset_end, the_window,
312                start_time, open, close, high, low, direct_volume, swap_volume
313            ) 
314            VALUES(
315                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
316            )
317            ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
318                open = EXCLUDED.open,
319                close = EXCLUDED.close,
320                high = EXCLUDED.high,
321                low = EXCLUDED.low,
322                direct_volume = EXCLUDED.direct_volume,
323                swap_volume = EXCLUDED.swap_volume
324            ",
325            )
326            .bind(self.asset_start.to_bytes())
327            .bind(self.asset_end.to_bytes())
328            .bind(self.window.to_string())
329            .bind(start)
330            .bind(candle.open)
331            .bind(candle.close)
332            .bind(candle.high)
333            .bind(candle.low)
334            .bind(candle.direct_volume)
335            .bind(candle.swap_volume)
336            .execute(dbtx.as_mut())
337            .await?;
338            Ok(())
339        }
340
341        pub async fn update(
342            &mut self,
343            dbtx: &mut PgTransaction<'_>,
344            now: DateTime,
345            candle: Candle,
346        ) -> anyhow::Result<()> {
347            let state = match self.state.as_mut() {
348                Some(x) => x,
349                None => {
350                    self.state = Some(WindowedCandle::new(now, self.window, candle));
351                    self.state.as_mut().unwrap()
352                }
353            };
354            if let Some((start, old_candle)) = state.with_candle(now, candle) {
355                self.write_candle(dbtx, start, old_candle).await?;
356            };
357            Ok(())
358        }
359
360        pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
361            let state = std::mem::replace(&mut self.state, None);
362            if let Some(state) = state {
363                let (start, candle) = state.flush();
364                self.write_candle(dbtx, start, candle).await?;
365            }
366            Ok(())
367        }
368    }
369}
370
371use price_chart::Context as PriceChartContext;
372
373mod summary {
374    use cometindex::PgTransaction;
375    use penumbra_sdk_asset::asset;
376
377    use super::{Candle, DateTime, PairMetrics, Window};
378
379    pub struct Context {
380        start: asset::Id,
381        end: asset::Id,
382        price: f64,
383        liquidity: f64,
384        start_price_indexing_denom: f64,
385    }
386
387    impl Context {
388        pub async fn load(
389            dbtx: &mut PgTransaction<'_>,
390            start: asset::Id,
391            end: asset::Id,
392        ) -> anyhow::Result<Self> {
393            let row: Option<(f64, f64, f64)> = sqlx::query_as(
394                "
395                SELECT price, liquidity, start_price_indexing_denom
396                FROM dex_ex_pairs_block_snapshot
397                WHERE asset_start = $1
398                AND asset_end = $2
399                ORDER BY id DESC
400                LIMIT 1
401            ",
402            )
403            .bind(start.to_bytes())
404            .bind(end.to_bytes())
405            .fetch_optional(dbtx.as_mut())
406            .await?;
407            let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
408            Ok(Self {
409                start,
410                end,
411                price,
412                liquidity,
413                start_price_indexing_denom,
414            })
415        }
416
417        pub async fn update(
418            &mut self,
419            dbtx: &mut PgTransaction<'_>,
420            now: DateTime,
421            candle: Option<Candle>,
422            metrics: PairMetrics,
423            start_price_indexing_denom: Option<f64>,
424        ) -> anyhow::Result<()> {
425            if let Some(candle) = candle {
426                self.price = candle.close;
427            }
428            if let Some(price) = start_price_indexing_denom {
429                self.start_price_indexing_denom = price;
430            }
431            self.liquidity += metrics.liquidity_change;
432
433            sqlx::query(
434                "
435            INSERT INTO dex_ex_pairs_block_snapshot VALUES (
436                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
437            )        
438        ",
439            )
440            .bind(now)
441            .bind(self.start.to_bytes())
442            .bind(self.end.to_bytes())
443            .bind(self.price)
444            .bind(self.liquidity)
445            .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
446            .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
447            .bind(self.start_price_indexing_denom)
448            .bind(metrics.trades)
449            .execute(dbtx.as_mut())
450            .await?;
451            Ok(())
452        }
453    }
454
455    pub async fn update_summaries(
456        dbtx: &mut PgTransaction<'_>,
457        now: DateTime,
458        window: Window,
459    ) -> anyhow::Result<()> {
460        let then = window.subtract_from(now);
461        let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
462            "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
463        )
464        .fetch_all(dbtx.as_mut())
465        .await?;
466        for (start, end) in pairs {
467            sqlx::query(
468                "
469            WITH
470            snapshots AS (
471                SELECT *
472                FROM dex_ex_pairs_block_snapshot
473                WHERE asset_start = $1
474                AND asset_end = $2
475            ),
476            previous AS (
477                SELECT price AS price_then, liquidity AS liquidity_then
478                FROM snapshots
479                WHERE time <= $4
480                ORDER BY time DESC
481                LIMIT 1
482            ),
483            previous_or_default AS (
484                SELECT
485                    COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
486                    COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
487            ),
488            now AS (
489                SELECT price, liquidity            
490                FROM snapshots
491                WHERE time <= $3
492                ORDER BY time DESC
493                LIMIT 1
494            ),
495            sums AS (
496                SELECT
497                    COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
498                    COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
499                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
500                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
501                    COALESCE(SUM(trades), 0.0) AS trades_over_window,
502                    COALESCE(MIN(price), 0.0) AS low,
503                    COALESCE(MAX(price), 0.0) AS high
504                FROM snapshots
505                WHERE time <= $3
506                AND time >= $4
507            )
508            INSERT INTO dex_ex_pairs_summary
509            SELECT 
510                $1, $2, $5,
511                price, price_then,
512                low, high,
513                liquidity, liquidity_then,
514                direct_volume_over_window,
515                swap_volume_over_window,
516                direct_volume_indexing_denom_over_window,
517                swap_volume_indexing_denom_over_window,
518                trades_over_window
519            FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
520            ON CONFLICT (asset_start, asset_end, the_window)
521            DO UPDATE SET
522                price = EXCLUDED.price,
523                price_then = EXCLUDED.price_then,
524                liquidity = EXCLUDED.liquidity,
525                liquidity_then = EXCLUDED.liquidity_then,
526                direct_volume_over_window = EXCLUDED.direct_volume_over_window,
527                swap_volume_over_window = EXCLUDED.swap_volume_over_window,
528                direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
529                swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
530                trades_over_window = EXCLUDED.trades_over_window
531            ",
532            )
533            .bind(start)
534            .bind(end)
535            .bind(now)
536            .bind(then)
537            .bind(window.to_string())
538            .execute(dbtx.as_mut())
539            .await?;
540        }
541        Ok(())
542    }
543
544    pub async fn update_aggregate_summary(
545        dbtx: &mut PgTransaction<'_>,
546        window: Window,
547        denom: asset::Id,
548        min_liquidity: f64,
549    ) -> anyhow::Result<()> {
550        // TODO: do something here
551        sqlx::query(
552            "
553        WITH       
554            eligible_denoms AS (
555                SELECT asset_start as asset, price
556                FROM dex_ex_pairs_summary
557                WHERE asset_end = $1
558                AND liquidity >= $2
559                UNION VALUES ($1, 1.0)
560            ),
561            converted_pairs_summary AS (
562                SELECT
563                    asset_start, asset_end,
564                    (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
565                    liquidity * ed_end.price AS liquidity,
566                    direct_volume_indexing_denom_over_window AS dv,
567                    swap_volume_indexing_denom_over_window AS sv,
568                    trades_over_window as trades
569                FROM dex_ex_pairs_summary
570                JOIN eligible_denoms AS ed_end
571                ON ed_end.asset = asset_end
572                JOIN eligible_denoms AS ed_start
573                ON ed_start.asset = asset_start
574                WHERE the_window = $3
575            ),
576            sums AS (
577                SELECT
578                    SUM(dv) AS direct_volume,
579                    SUM(sv) AS swap_volume,
580                    SUM(liquidity) AS liquidity,
581                    SUM(trades) AS trades,
582                    (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs
583                FROM converted_pairs_summary
584            ),
585            largest_sv AS (
586                SELECT
587                    asset_start AS largest_sv_trading_pair_start,
588                    asset_end AS largest_sv_trading_pair_end,
589                    sv AS largest_sv_trading_pair_volume                    
590                FROM converted_pairs_summary
591                ORDER BY sv DESC
592                LIMIT 1
593            ),
594            largest_dv AS (
595                SELECT
596                    asset_start AS largest_dv_trading_pair_start,
597                    asset_end AS largest_dv_trading_pair_end,
598                    dv AS largest_dv_trading_pair_volume                    
599                FROM converted_pairs_summary
600                ORDER BY dv DESC
601                LIMIT 1
602            ),
603            top_price_mover AS (
604                SELECT
605                    asset_start AS top_price_mover_start,
606                    asset_end AS top_price_mover_end,
607                    price_change AS top_price_mover_change_percent
608                FROM converted_pairs_summary
609                ORDER BY price_change DESC
610                LIMIT 1
611            )
612        INSERT INTO dex_ex_aggregate_summary
613        SELECT
614            $3,
615            direct_volume, swap_volume, liquidity, trades, active_pairs,
616            largest_sv_trading_pair_start,
617            largest_sv_trading_pair_end,
618            largest_sv_trading_pair_volume,
619            largest_dv_trading_pair_start,
620            largest_dv_trading_pair_end,
621            largest_dv_trading_pair_volume,
622            top_price_mover_start,
623            top_price_mover_end,
624            top_price_mover_change_percent
625        FROM
626            sums
627        JOIN largest_sv ON TRUE
628        JOIN largest_dv ON TRUE
629        JOIN top_price_mover ON TRUE
630        ON CONFLICT (the_window) DO UPDATE SET
631            direct_volume = EXCLUDED.direct_volume,
632            swap_volume = EXCLUDED.swap_volume,
633            liquidity = EXCLUDED.liquidity,
634            trades = EXCLUDED.trades,
635            active_pairs = EXCLUDED.active_pairs,          
636            largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
637            largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
638            largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
639            largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
640            largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
641            largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
642            top_price_mover_start = EXCLUDED.top_price_mover_start,
643            top_price_mover_end = EXCLUDED.top_price_mover_end,
644            top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
645        ")
646        .bind(denom.to_bytes())
647        .bind(min_liquidity)
648        .bind(window.to_string())
649        .execute(dbtx.as_mut())
650        .await?;
651        Ok(())
652    }
653}
654
655mod metadata {
656    use super::*;
657
658    pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
659        sqlx::query(
660            "
661        INSERT INTO dex_ex_metadata
662        VALUES (1, $1)
663        ON CONFLICT (id) DO UPDATE 
664        SET id = EXCLUDED.id,
665            quote_asset_id = EXCLUDED.quote_asset_id
666        ",
667        )
668        .bind(quote_asset.to_bytes())
669        .execute(dbtx.as_mut())
670        .await?;
671        Ok(())
672    }
673}
674
675#[derive(Debug, Default, Clone, Copy)]
676struct PairMetrics {
677    trades: f64,
678    liquidity_change: f64,
679}
680
681#[derive(Debug, Clone, serde::Serialize)]
682struct BatchSwapSummary {
683    asset_start: asset::Id,
684    asset_end: asset::Id,
685    input: Amount,
686    output: Amount,
687    num_swaps: i32,
688    price_float: f64,
689}
690
691#[derive(Debug)]
692struct Events {
693    time: Option<DateTime>,
694    height: i32,
695    candles: HashMap<DirectedTradingPair, Candle>,
696    metrics: HashMap<DirectedTradingPair, PairMetrics>,
697    // Relevant positions.
698    positions: BTreeMap<PositionId, Position>,
699    // Store events
700    position_opens: Vec<EventPositionOpen>,
701    position_executions: Vec<EventPositionExecution>,
702    position_closes: Vec<EventPositionClose>,
703    position_withdrawals: Vec<EventPositionWithdraw>,
704    batch_swaps: Vec<EventBatchSwap>,
705    swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
706    swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
707    // Track transaction hashes by position ID
708    position_open_txs: BTreeMap<PositionId, [u8; 32]>,
709    position_close_txs: BTreeMap<PositionId, [u8; 32]>,
710    position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
711}
712
713impl Events {
714    fn new() -> Self {
715        Self {
716            time: None,
717            height: 0,
718            candles: HashMap::new(),
719            metrics: HashMap::new(),
720            positions: BTreeMap::new(),
721            position_opens: Vec::new(),
722            position_executions: Vec::new(),
723            position_closes: Vec::new(),
724            position_withdrawals: Vec::new(),
725            batch_swaps: Vec::new(),
726            swaps: BTreeMap::new(),
727            swap_claims: BTreeMap::new(),
728            position_open_txs: BTreeMap::new(),
729            position_close_txs: BTreeMap::new(),
730            position_withdrawal_txs: BTreeMap::new(),
731        }
732    }
733
734    fn with_time(&mut self, time: DateTime) {
735        self.time = Some(time)
736    }
737
738    fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) {
739        // Populate both this pair and the flipped pair, and if the flipped pair
740        // is already populated, we need to mix the two candles together.
741        let flip = pair.flip();
742        let new_candle = match self.candles.get(&flip).cloned() {
743            None => candle,
744            Some(flipped) => {
745                let mut out = candle;
746                out.mix(&flipped);
747                out
748            }
749        };
750
751        self.candles.insert(pair, new_candle);
752        self.candles.insert(flip, new_candle.flip());
753    }
754
755    fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
756        if !self.metrics.contains_key(pair) {
757            self.metrics.insert(*pair, PairMetrics::default());
758        }
759        // NOPANIC: inserted above.
760        self.metrics.get_mut(pair).unwrap()
761    }
762
763    fn with_trade(&mut self, pair: &DirectedTradingPair) {
764        self.metric(pair).trades += 1.0;
765    }
766
767    fn with_reserve_change(
768        &mut self,
769        pair: &TradingPair,
770        old_reserves: Option<Reserves>,
771        new_reserves: Reserves,
772        removed: bool,
773    ) {
774        let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
775            (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
776            (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
777            (_, Some(old), new) => (
778                (new.r1.value() as f64) - (old.r1.value() as f64),
779                (new.r2.value() as f64) - (old.r2.value() as f64),
780            ),
781        };
782        for (d_pair, diff) in [
783            (
784                DirectedTradingPair {
785                    start: pair.asset_1(),
786                    end: pair.asset_2(),
787                },
788                diff_2,
789            ),
790            (
791                DirectedTradingPair {
792                    start: pair.asset_2(),
793                    end: pair.asset_1(),
794                },
795                diff_1,
796            ),
797        ] {
798            self.metric(&d_pair).liquidity_change += diff;
799        }
800    }
801
802    pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
803        let mut out = Self::new();
804        out.height = block.height() as i32;
805
806        for event in block.events() {
807            if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
808                let candle = Candle::from_candlestick_data(&e.stick);
809                out.with_candle(e.pair, candle);
810            } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
811                let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
812                    "creating timestamp should succeed; timestamp: {}",
813                    e.timestamp_seconds
814                ))?;
815                out.with_time(time);
816            } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
817                out.with_reserve_change(
818                    &e.trading_pair,
819                    None,
820                    Reserves {
821                        r1: e.reserves_1,
822                        r2: e.reserves_2,
823                    },
824                    false,
825                );
826                if let Some(tx_hash) = event.tx_hash() {
827                    out.position_open_txs.insert(e.position_id, tx_hash);
828                }
829                // A newly opened position might be executed against in this block,
830                // but wouldn't already be in the database. Adding it here ensures
831                // it's available.
832                out.positions.insert(e.position_id, e.position.clone());
833                out.position_opens.push(e);
834            } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
835                // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few
836                // positions to close with being withdrawn.
837                out.with_reserve_change(
838                    &e.trading_pair,
839                    None,
840                    Reserves {
841                        r1: e.reserves_1,
842                        r2: e.reserves_2,
843                    },
844                    true,
845                );
846                if let Some(tx_hash) = event.tx_hash() {
847                    out.position_withdrawal_txs.insert(e.position_id, tx_hash);
848                }
849                out.position_withdrawals.push(e);
850            } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
851                out.with_reserve_change(
852                    &e.trading_pair,
853                    Some(Reserves {
854                        r1: e.prev_reserves_1,
855                        r2: e.prev_reserves_2,
856                    }),
857                    Reserves {
858                        r1: e.reserves_1,
859                        r2: e.reserves_2,
860                    },
861                    false,
862                );
863                if e.reserves_1 > e.prev_reserves_1 {
864                    // Whatever asset we ended up with more with was traded in.
865                    out.with_trade(&DirectedTradingPair {
866                        start: e.trading_pair.asset_1(),
867                        end: e.trading_pair.asset_2(),
868                    });
869                } else if e.reserves_2 > e.prev_reserves_2 {
870                    out.with_trade(&DirectedTradingPair {
871                        start: e.trading_pair.asset_2(),
872                        end: e.trading_pair.asset_1(),
873                    });
874                }
875                out.position_executions.push(e);
876            } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
877                out.position_closes.push(e);
878            } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
879                // The position close event is emitted by the dex module at EOB,
880                // so we need to track it with the tx hash of the closure tx.
881                if let Some(tx_hash) = event.tx_hash() {
882                    out.position_close_txs.insert(e.position_id, tx_hash);
883                }
884            } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
885                out.batch_swaps.push(e);
886            } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
887                out.swaps
888                    .entry(e.trading_pair)
889                    .or_insert_with(Vec::new)
890                    .push(e);
891            } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
892                out.swap_claims
893                    .entry(e.trading_pair)
894                    .or_insert_with(Vec::new)
895                    .push(e);
896            }
897        }
898        Ok(out)
899    }
900
901    async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
902        // Collect position IDs that we need but don't already have
903        let missing_positions: Vec<_> = self
904            .position_executions
905            .iter()
906            .map(|e| e.position_id)
907            .filter(|id| !self.positions.contains_key(id))
908            .collect();
909
910        if missing_positions.is_empty() {
911            return Ok(());
912        }
913
914        // Load missing positions from database
915        let rows = sqlx::query(
916            "SELECT position_raw 
917             FROM dex_ex_position_state 
918             WHERE position_id = ANY($1)",
919        )
920        .bind(
921            &missing_positions
922                .iter()
923                .map(|id| id.0.as_ref())
924                .collect::<Vec<_>>(),
925        )
926        .fetch_all(dbtx.as_mut())
927        .await?;
928
929        // Decode and store each position
930        for row in rows {
931            let position_raw: Vec<u8> = row.get("position_raw");
932            let position = Position::decode(position_raw.as_slice())?;
933            self.positions.insert(position.id(), position);
934        }
935
936        Ok(())
937    }
938
939    /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
940    pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
941        self.candles
942            .get(&DirectedTradingPair::new(asset, indexing_denom))
943            .map(|x| x.close)
944    }
945}
946
947#[derive(Debug)]
948pub struct Component {
949    denom: asset::Id,
950    min_liquidity: f64,
951}
952
953impl Component {
954    pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
955        Self {
956            denom,
957            min_liquidity,
958        }
959    }
960
961    async fn record_position_open(
962        &self,
963        dbtx: &mut PgTransaction<'_>,
964        time: DateTime,
965        height: i32,
966        tx_hash: Option<[u8; 32]>,
967        event: &EventPositionOpen,
968    ) -> anyhow::Result<()> {
969        // Get effective prices by orienting the trading function in each direction
970        let effective_price_1_to_2: f64 = event
971            .position
972            .phi
973            .orient_start(event.trading_pair.asset_1())
974            .expect("position trading pair matches")
975            .effective_price()
976            .into();
977
978        let effective_price_2_to_1: f64 = event
979            .position
980            .phi
981            .orient_start(event.trading_pair.asset_2())
982            .expect("position trading pair matches")
983            .effective_price()
984            .into();
985
986        // First insert initial reserves and get the rowid
987        let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
988            "INSERT INTO dex_ex_position_reserves (
989                position_id,
990                height,
991                time,
992                reserves_1,
993                reserves_2
994            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
995        )
996        .bind(event.position_id.0)
997        .bind(height)
998        .bind(time)
999        .bind(BigDecimal::from(event.reserves_1.value()))
1000        .bind(BigDecimal::from(event.reserves_2.value()))
1001        .fetch_one(dbtx.as_mut())
1002        .await?;
1003
1004        // Then insert position state with the opening_reserves_rowid
1005        sqlx::query(
1006            "INSERT INTO dex_ex_position_state (
1007                position_id,
1008                asset_1,
1009                asset_2,
1010                p,
1011                q,
1012                close_on_fill,
1013                fee_bps,
1014                effective_price_1_to_2,
1015                effective_price_2_to_1,
1016                position_raw,
1017                opening_time,
1018                opening_height,
1019                opening_tx,
1020                opening_reserves_rowid
1021            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1022        )
1023        .bind(event.position_id.0)
1024        .bind(event.trading_pair.asset_1().to_bytes())
1025        .bind(event.trading_pair.asset_2().to_bytes())
1026        .bind(BigDecimal::from(event.position.phi.component.p.value()))
1027        .bind(BigDecimal::from(event.position.phi.component.q.value()))
1028        .bind(event.position.close_on_fill)
1029        .bind(event.trading_fee as i32)
1030        .bind(effective_price_1_to_2)
1031        .bind(effective_price_2_to_1)
1032        .bind(event.position.encode_to_vec())
1033        .bind(time)
1034        .bind(height)
1035        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1036        .bind(opening_reserves_rowid)
1037        .execute(dbtx.as_mut())
1038        .await?;
1039
1040        Ok(())
1041    }
1042
1043    async fn record_swap_execution_traces(
1044        &self,
1045        dbtx: &mut PgTransaction<'_>,
1046        time: DateTime,
1047        height: i32,
1048        swap_execution: &SwapExecution,
1049    ) -> anyhow::Result<()> {
1050        let SwapExecution {
1051            traces,
1052            input: se_input,
1053            output: se_output,
1054        } = swap_execution;
1055
1056        let asset_start = se_input.asset_id;
1057        let asset_end = se_output.asset_id;
1058        let batch_input = se_input.amount;
1059        let batch_output = se_output.amount;
1060
1061        for trace in traces.iter() {
1062            let Some(input_value) = trace.first() else {
1063                continue;
1064            };
1065            let Some(output_value) = trace.last() else {
1066                continue;
1067            };
1068
1069            let input = input_value.amount;
1070            let output = output_value.amount;
1071
1072            let price_float = (output.value() as f64) / (input.value() as f64);
1073            let amount_hops = trace
1074                .iter()
1075                .map(|x| BigDecimal::from(x.amount.value()))
1076                .collect::<Vec<_>>();
1077            let position_id_hops: Vec<[u8; 32]> = vec![];
1078            let asset_hops = trace
1079                .iter()
1080                .map(|x| x.asset_id.to_bytes())
1081                .collect::<Vec<_>>();
1082
1083            sqlx::query(
1084                "INSERT INTO dex_ex_batch_swap_traces (
1085                height,
1086                time,
1087                input,
1088                output,
1089                batch_input,
1090                batch_output,
1091                price_float,
1092                asset_start,
1093                asset_end,
1094                asset_hops,
1095                amount_hops,
1096               position_id_hops
1097            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1098            )
1099            .bind(height)
1100            .bind(time)
1101            .bind(BigDecimal::from(input.value()))
1102            .bind(BigDecimal::from(output.value()))
1103            .bind(BigDecimal::from(batch_input.value()))
1104            .bind(BigDecimal::from(batch_output.value()))
1105            .bind(price_float)
1106            .bind(asset_start.to_bytes())
1107            .bind(asset_end.to_bytes())
1108            .bind(asset_hops)
1109            .bind(amount_hops)
1110            .bind(position_id_hops)
1111            .execute(dbtx.as_mut())
1112            .await?;
1113        }
1114
1115        Ok(())
1116    }
1117
1118    async fn record_block_summary(
1119        &self,
1120        dbtx: &mut PgTransaction<'_>,
1121        time: DateTime,
1122        height: i32,
1123        events: &Events,
1124    ) -> anyhow::Result<()> {
1125        let num_opened_lps = events.position_opens.len() as i32;
1126        let num_closed_lps = events.position_closes.len() as i32;
1127        let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1128        let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1129        let num_swap_claims = events
1130            .swap_claims
1131            .iter()
1132            .map(|(_, v)| v.len())
1133            .sum::<usize>() as i32;
1134        let num_txs = events.batch_swaps.len() as i32;
1135
1136        let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1137
1138        for event in &events.batch_swaps {
1139            let trading_pair = event.batch_swap_output_data.trading_pair;
1140
1141            if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1142                let asset_start = swap_1_2.input.asset_id;
1143                let asset_end = swap_1_2.output.asset_id;
1144                let input = swap_1_2.input.amount;
1145                let output = swap_1_2.output.amount;
1146                let price_float = (output.value() as f64) / (input.value() as f64);
1147
1148                let empty_vec = vec![];
1149                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1150                let filtered_swaps: Vec<_> = swaps_for_pair
1151                    .iter()
1152                    .filter(|swap| swap.delta_1_i != Amount::zero())
1153                    .collect::<Vec<_>>();
1154                let num_swaps = filtered_swaps.len() as i32;
1155
1156                batch_swap_summaries.push(BatchSwapSummary {
1157                    asset_start,
1158                    asset_end,
1159                    input,
1160                    output,
1161                    num_swaps,
1162                    price_float,
1163                });
1164            }
1165
1166            if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1167                let asset_start = swap_2_1.input.asset_id;
1168                let asset_end = swap_2_1.output.asset_id;
1169                let input = swap_2_1.input.amount;
1170                let output = swap_2_1.output.amount;
1171                let price_float = (output.value() as f64) / (input.value() as f64);
1172
1173                let empty_vec = vec![];
1174                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1175                let filtered_swaps: Vec<_> = swaps_for_pair
1176                    .iter()
1177                    .filter(|swap| swap.delta_2_i != Amount::zero())
1178                    .collect::<Vec<_>>();
1179                let num_swaps = filtered_swaps.len() as i32;
1180
1181                batch_swap_summaries.push(BatchSwapSummary {
1182                    asset_start,
1183                    asset_end,
1184                    input,
1185                    output,
1186                    num_swaps,
1187                    price_float,
1188                });
1189            }
1190        }
1191
1192        sqlx::query(
1193            "INSERT INTO dex_ex_block_summary (
1194            height,
1195            time,
1196            batch_swaps,
1197            num_open_lps,
1198            num_closed_lps,
1199            num_withdrawn_lps,
1200            num_swaps,
1201            num_swap_claims,
1202            num_txs
1203        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1204        )
1205        .bind(height)
1206        .bind(time)
1207        .bind(serde_json::to_value(&batch_swap_summaries)?)
1208        .bind(num_opened_lps)
1209        .bind(num_closed_lps)
1210        .bind(num_withdrawn_lps)
1211        .bind(num_swaps)
1212        .bind(num_swap_claims)
1213        .bind(num_txs)
1214        .execute(dbtx.as_mut())
1215        .await?;
1216
1217        Ok(())
1218    }
1219
1220    async fn record_batch_swap_traces(
1221        &self,
1222        dbtx: &mut PgTransaction<'_>,
1223        time: DateTime,
1224        height: i32,
1225        event: &EventBatchSwap,
1226    ) -> anyhow::Result<()> {
1227        let EventBatchSwap {
1228            batch_swap_output_data: _,
1229            swap_execution_1_for_2,
1230            swap_execution_2_for_1,
1231        } = event;
1232
1233        if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1234            self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1235                .await?;
1236        }
1237
1238        if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1239            self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1240                .await?;
1241        }
1242
1243        Ok(())
1244    }
1245
1246    async fn record_position_execution(
1247        &self,
1248        dbtx: &mut PgTransaction<'_>,
1249        time: DateTime,
1250        height: i32,
1251        event: &EventPositionExecution,
1252        positions: &BTreeMap<PositionId, Position>,
1253    ) -> anyhow::Result<()> {
1254        // Get the position that was executed against
1255        let position = positions
1256            .get(&event.position_id)
1257            .expect("position must exist for execution");
1258
1259        // Determine trade direction and compute deltas
1260        let (delta_1, delta_2, lambda_1, lambda_2) = if event.reserves_1 > event.prev_reserves_1 {
1261            // Asset 1 was input
1262            let delta_1 = event.reserves_1 - event.prev_reserves_1;
1263            let lambda_2 = event.prev_reserves_2 - event.reserves_2;
1264            (delta_1, Amount::zero(), Amount::zero(), lambda_2)
1265        } else {
1266            // Asset 2 was input
1267            let delta_2 = event.reserves_2 - event.prev_reserves_2;
1268            let lambda_1 = event.prev_reserves_1 - event.reserves_1;
1269            (Amount::zero(), delta_2, lambda_1, Amount::zero())
1270        };
1271
1272        // Compute fees directly from input amounts using u128 arithmetic
1273        let fee_bps = position.phi.component.fee as u128;
1274        let fee_1 = (delta_1.value() * fee_bps) / 10_000u128;
1275        let fee_2 = (delta_2.value() * fee_bps) / 10_000u128;
1276
1277        // First insert the reserves and get the rowid
1278        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1279            "INSERT INTO dex_ex_position_reserves (
1280                position_id,
1281                height,
1282                time,
1283                reserves_1,
1284                reserves_2
1285            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1286        )
1287        .bind(event.position_id.0)
1288        .bind(height)
1289        .bind(time)
1290        .bind(BigDecimal::from(event.reserves_1.value()))
1291        .bind(BigDecimal::from(event.reserves_2.value()))
1292        .fetch_one(dbtx.as_mut())
1293        .await?;
1294
1295        // Then record the execution with the reserves_rowid
1296        sqlx::query(
1297            "INSERT INTO dex_ex_position_executions (
1298                position_id,
1299                height,
1300                time,
1301                reserves_rowid,
1302                delta_1,
1303                delta_2,
1304                lambda_1,
1305                lambda_2,
1306                fee_1,
1307                fee_2,
1308                context_asset_start,
1309                context_asset_end
1310            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1311        )
1312        .bind(event.position_id.0)
1313        .bind(height)
1314        .bind(time)
1315        .bind(reserves_rowid)
1316        .bind(BigDecimal::from(delta_1.value()))
1317        .bind(BigDecimal::from(delta_2.value()))
1318        .bind(BigDecimal::from(lambda_1.value()))
1319        .bind(BigDecimal::from(lambda_2.value()))
1320        .bind(BigDecimal::from(fee_1))
1321        .bind(BigDecimal::from(fee_2))
1322        .bind(event.context.start.to_bytes())
1323        .bind(event.context.end.to_bytes())
1324        .execute(dbtx.as_mut())
1325        .await?;
1326
1327        Ok(())
1328    }
1329
1330    async fn record_position_close(
1331        &self,
1332        dbtx: &mut PgTransaction<'_>,
1333        time: DateTime,
1334        height: i32,
1335        tx_hash: Option<[u8; 32]>,
1336        event: &EventPositionClose,
1337    ) -> anyhow::Result<()> {
1338        sqlx::query(
1339            "UPDATE dex_ex_position_state 
1340             SET closing_time = $1,
1341                 closing_height = $2,
1342                 closing_tx = $3
1343             WHERE position_id = $4",
1344        )
1345        .bind(time)
1346        .bind(height)
1347        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1348        .bind(event.position_id.0)
1349        .execute(dbtx.as_mut())
1350        .await?;
1351
1352        Ok(())
1353    }
1354
1355    async fn record_position_withdraw(
1356        &self,
1357        dbtx: &mut PgTransaction<'_>,
1358        time: DateTime,
1359        height: i32,
1360        tx_hash: Option<[u8; 32]>,
1361        event: &EventPositionWithdraw,
1362    ) -> anyhow::Result<()> {
1363        // First insert the final reserves state (zeros after withdrawal)
1364        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1365            "INSERT INTO dex_ex_position_reserves (
1366                position_id,
1367                height,
1368                time,
1369                reserves_1,
1370                reserves_2
1371            ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values
1372        )
1373        .bind(event.position_id.0)
1374        .bind(height)
1375        .bind(time)
1376        .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal
1377        .fetch_one(dbtx.as_mut())
1378        .await?;
1379
1380        sqlx::query(
1381            "INSERT INTO dex_ex_position_withdrawals (
1382                position_id,
1383                height,
1384                time,
1385                withdrawal_tx,
1386                sequence,
1387                reserves_1,
1388                reserves_2,
1389                reserves_rowid
1390            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1391        )
1392        .bind(event.position_id.0)
1393        .bind(height)
1394        .bind(time)
1395        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1396        .bind(event.sequence as i32)
1397        .bind(BigDecimal::from(event.reserves_1.value()))
1398        .bind(BigDecimal::from(event.reserves_2.value()))
1399        .bind(reserves_rowid)
1400        .execute(dbtx.as_mut())
1401        .await?;
1402
1403        Ok(())
1404    }
1405
1406    async fn record_transaction(
1407        &self,
1408        dbtx: &mut PgTransaction<'_>,
1409        time: DateTime,
1410        height: u64,
1411        transaction_id: [u8; 32],
1412        transaction: Transaction,
1413    ) -> anyhow::Result<()> {
1414        if transaction.transaction_body.actions.is_empty() {
1415            return Ok(());
1416        }
1417        sqlx::query(
1418            "INSERT INTO dex_ex_transactions (
1419                transaction_id,
1420                transaction,
1421                height,
1422                time
1423            ) VALUES ($1, $2, $3, $4)
1424            ",
1425        )
1426        .bind(transaction_id)
1427        .bind(transaction.encode_to_vec())
1428        .bind(i32::try_from(height)?)
1429        .bind(time)
1430        .execute(dbtx.as_mut())
1431        .await?;
1432
1433        Ok(())
1434    }
1435
1436    async fn record_all_transactions(
1437        &self,
1438        dbtx: &mut PgTransaction<'_>,
1439        time: DateTime,
1440        block: &BlockEvents,
1441    ) -> anyhow::Result<()> {
1442        for (tx_id, tx_bytes) in block.transactions() {
1443            let tx = Transaction::try_from(tx_bytes)?;
1444            let height = block.height();
1445            self.record_transaction(dbtx, time, height, tx_id, tx)
1446                .await?;
1447        }
1448        Ok(())
1449    }
1450}
1451
1452#[async_trait]
1453impl AppView for Component {
1454    async fn init_chain(
1455        &self,
1456        dbtx: &mut PgTransaction,
1457        _: &serde_json::Value,
1458    ) -> Result<(), anyhow::Error> {
1459        for statement in include_str!("schema.sql").split(";") {
1460            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1461        }
1462        Ok(())
1463    }
1464
1465    fn name(&self) -> String {
1466        "dex_ex".to_string()
1467    }
1468
1469    async fn index_batch(
1470        &self,
1471        dbtx: &mut PgTransaction,
1472        batch: EventBatch,
1473        ctx: EventBatchContext,
1474    ) -> Result<(), anyhow::Error> {
1475        metadata::set(dbtx, self.denom).await?;
1476        let mut charts = HashMap::new();
1477        let mut snapshots = HashMap::new();
1478        let mut last_time = None;
1479        for block in batch.events_by_block() {
1480            let mut events = Events::extract(&block)?;
1481            let time = events
1482                .time
1483                .expect(&format!("no block root event at height {}", block.height()));
1484            last_time = Some(time);
1485
1486            self.record_all_transactions(dbtx, time, block).await?;
1487
1488            // Load any missing positions before processing events
1489            events.load_positions(dbtx).await?;
1490
1491            // This is where we are going to build the block summary for the DEX.
1492            self.record_block_summary(dbtx, time, block.height() as i32, &events)
1493                .await?;
1494
1495            // Record batch swap execution traces.
1496            for event in &events.batch_swaps {
1497                self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1498                    .await?;
1499            }
1500
1501            // Record position opens
1502            for event in &events.position_opens {
1503                let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1504                self.record_position_open(dbtx, time, events.height, tx_hash, event)
1505                    .await?;
1506            }
1507
1508            // Process position executions
1509            for event in &events.position_executions {
1510                self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1511                    .await?;
1512            }
1513
1514            // Record position closes
1515            for event in &events.position_closes {
1516                let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1517                self.record_position_close(dbtx, time, events.height, tx_hash, event)
1518                    .await?;
1519            }
1520
1521            // Record position withdrawals
1522            for event in &events.position_withdrawals {
1523                let tx_hash = events
1524                    .position_withdrawal_txs
1525                    .get(&event.position_id)
1526                    .copied();
1527                self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1528                    .await?;
1529            }
1530
1531            for (pair, candle) in &events.candles {
1532                for window in Window::all() {
1533                    let key = (pair.start, pair.end, window);
1534                    if !charts.contains_key(&key) {
1535                        let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1536                        charts.insert(key, ctx);
1537                    }
1538                    charts
1539                        .get_mut(&key)
1540                        .unwrap() // safe because we just inserted above
1541                        .update(dbtx, time, *candle)
1542                        .await?;
1543                }
1544            }
1545
1546            let block_pairs = events
1547                .candles
1548                .keys()
1549                .chain(events.metrics.keys())
1550                .copied()
1551                .collect::<HashSet<_>>();
1552            for pair in block_pairs {
1553                if !snapshots.contains_key(&pair) {
1554                    let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1555                    snapshots.insert(pair, ctx);
1556                }
1557                // NOPANIC: inserted above
1558                snapshots
1559                    .get_mut(&pair)
1560                    .unwrap()
1561                    .update(
1562                        dbtx,
1563                        time,
1564                        events.candles.get(&pair).copied(),
1565                        events.metrics.get(&pair).copied().unwrap_or_default(),
1566                        events.price_for(self.denom, pair.start),
1567                    )
1568                    .await?;
1569            }
1570        }
1571
1572        if ctx.is_last() {
1573            if let Some(now) = last_time {
1574                for window in Window::all() {
1575                    summary::update_summaries(dbtx, now, window).await?;
1576                    summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1577                        .await?;
1578                }
1579            }
1580        }
1581        for chart in charts.into_values() {
1582            chart.unload(dbtx).await?;
1583        }
1584        Ok(())
1585    }
1586}