pindexer/dex_ex/
mod.rs

1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4    async_trait,
5    index::{BlockEvents, EventBatch, EventBatchContext, Version},
6    AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10    event::{
11        EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12        EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13    },
14    lp::{position::Flows, Reserves},
15    DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18    event::{EventSwap, EventSwapClaim},
19    lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34    use super::DateTime;
35    use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36    use penumbra_sdk_dex::CandlestickData;
37    use std::fmt::Display;
38
39    /// Candlestick data, unmoored from the prison of a particular block height.
40    ///
41    /// In other words, this can represent candlesticks which span arbitrary windows,
42    /// and not just a single block.
43    #[derive(Debug, Clone, Copy)]
44    pub struct Candle {
45        pub open: f64,
46        pub close: f64,
47        pub low: f64,
48        pub high: f64,
49        pub direct_volume: f64,
50        pub swap_volume: f64,
51    }
52
53    impl Candle {
54        pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55            // The volume is tracked in terms of input asset.
56            // We can use the closing price (aka. the common clearing price) to convert
57            // the volume to the other direction i.e, the batch swap output.
58            Self {
59                open: data.open,
60                close: data.close,
61                low: data.low,
62                high: data.high,
63                direct_volume: data.direct_volume,
64                swap_volume: data.swap_volume,
65            }
66        }
67
68        pub fn point(price: f64, direct_volume: f64) -> Self {
69            Self {
70                open: price,
71                close: price,
72                low: price,
73                high: price,
74                direct_volume,
75                swap_volume: 0.0,
76            }
77        }
78
79        pub fn merge(&mut self, that: &Self) {
80            self.close = that.close;
81            self.low = self.low.min(that.low);
82            self.high = self.high.max(that.high);
83            self.direct_volume += that.direct_volume;
84            self.swap_volume += that.swap_volume;
85        }
86    }
87
88    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89    pub enum Window {
90        W1m,
91        W15m,
92        W1h,
93        W4h,
94        W1d,
95        W1w,
96        W1mo,
97    }
98
99    impl Window {
100        pub fn all() -> impl Iterator<Item = Self> {
101            [
102                Window::W1m,
103                Window::W15m,
104                Window::W1h,
105                Window::W4h,
106                Window::W1d,
107                Window::W1w,
108                Window::W1mo,
109            ]
110            .into_iter()
111        }
112
113        /// Get the anchor for a given time.
114        ///
115        /// This is the latest time that "snaps" to a given anchor, dependent on the window.
116        ///
117        /// For example, the 1 minute window has an anchor every minute, the day window
118        /// every day, etc.
119        pub fn anchor(&self, time: DateTime) -> DateTime {
120            let (y, mo, d, h, m) = (
121                time.year(),
122                time.month(),
123                time.day(),
124                time.hour(),
125                time.minute(),
126            );
127            let out = match self {
128                Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129                Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130                Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131                Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132                Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133                Window::W1w => Utc
134                    .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135                    .single()
136                    .and_then(|x| {
137                        x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138                    }),
139                Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140            };
141            out.unwrap()
142        }
143
144        pub fn subtract_from(&self, time: DateTime) -> DateTime {
145            let delta = match self {
146                Window::W1m => TimeDelta::minutes(1),
147                Window::W15m => TimeDelta::minutes(15),
148                Window::W1h => TimeDelta::hours(1),
149                Window::W4h => TimeDelta::hours(4),
150                Window::W1d => TimeDelta::days(1),
151                Window::W1w => TimeDelta::weeks(1),
152                Window::W1mo => TimeDelta::days(30),
153            };
154            time - delta
155        }
156    }
157
158    impl Display for Window {
159        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160            use Window::*;
161            let str = match self {
162                W1m => "1m",
163                W15m => "15m",
164                W1h => "1h",
165                W4h => "4h",
166                W1d => "1d",
167                W1w => "1w",
168                W1mo => "1mo",
169            };
170            write!(f, "{}", str)
171        }
172    }
173
174    #[derive(Debug)]
175    pub struct WindowedCandle {
176        start: DateTime,
177        window: Window,
178        candle: Candle,
179    }
180
181    impl WindowedCandle {
182        pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183            Self {
184                start: window.anchor(now),
185                window,
186                candle,
187            }
188        }
189
190        /// Update with a new candlestick, at a given time.
191        ///
192        /// This may return the old candlestick and its start time, if we should be starting
193        /// a new candle, based on the candle for that window having already been closed.
194        pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195            let start = self.window.anchor(now);
196            // This candle belongs to the next window!
197            if start > self.start {
198                let old_start = std::mem::replace(&mut self.start, start);
199                let old_candle = std::mem::replace(&mut self.candle, candle);
200                Some((old_start, old_candle))
201            } else {
202                self.candle.merge(&candle);
203                None
204            }
205        }
206
207        pub fn window(&self) -> Window {
208            self.window
209        }
210
211        pub fn flush(self) -> (DateTime, Candle) {
212            (self.start, self.candle)
213        }
214    }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219    use super::*;
220
221    /// A context when processing a price chart.
222    #[derive(Debug)]
223    pub struct Context {
224        asset_start: asset::Id,
225        asset_end: asset::Id,
226        window: Window,
227        state: Option<WindowedCandle>,
228    }
229
230    impl Context {
231        pub async fn load(
232            dbtx: &mut PgTransaction<'_>,
233            asset_start: asset::Id,
234            asset_end: asset::Id,
235            window: Window,
236        ) -> anyhow::Result<Self> {
237            let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238                "
239                SELECT open, close, high, low, direct_volume, swap_volume, start_time
240                FROM dex_ex_price_charts
241                WHERE asset_start = $1
242                AND asset_end = $2
243                AND the_window = $3
244                ORDER BY start_time DESC
245                LIMIT 1
246            ",
247            )
248            .bind(asset_start.to_bytes())
249            .bind(asset_end.to_bytes())
250            .bind(window.to_string())
251            .fetch_optional(dbtx.as_mut())
252            .await?;
253            let state = row.map(
254                |(open, close, high, low, direct_volume, swap_volume, start)| {
255                    let candle = Candle {
256                        open,
257                        close,
258                        low,
259                        high,
260                        direct_volume,
261                        swap_volume,
262                    };
263                    WindowedCandle::new(start, window, candle)
264                },
265            );
266            Ok(Self {
267                asset_start,
268                asset_end,
269                window,
270                state,
271            })
272        }
273
274        async fn write_candle(
275            &self,
276            dbtx: &mut PgTransaction<'_>,
277            start: DateTime,
278            candle: Candle,
279        ) -> anyhow::Result<()> {
280            sqlx::query(
281                "
282            INSERT INTO dex_ex_price_charts(
283                id, asset_start, asset_end, the_window,
284                start_time, open, close, high, low, direct_volume, swap_volume
285            ) 
286            VALUES(
287                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288            )
289            ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290                open = EXCLUDED.open,
291                close = EXCLUDED.close,
292                high = EXCLUDED.high,
293                low = EXCLUDED.low,
294                direct_volume = EXCLUDED.direct_volume,
295                swap_volume = EXCLUDED.swap_volume
296            ",
297            )
298            .bind(self.asset_start.to_bytes())
299            .bind(self.asset_end.to_bytes())
300            .bind(self.window.to_string())
301            .bind(start)
302            .bind(candle.open)
303            .bind(candle.close)
304            .bind(candle.high)
305            .bind(candle.low)
306            .bind(candle.direct_volume)
307            .bind(candle.swap_volume)
308            .execute(dbtx.as_mut())
309            .await?;
310            Ok(())
311        }
312
313        pub async fn update(
314            &mut self,
315            dbtx: &mut PgTransaction<'_>,
316            now: DateTime,
317            candle: Candle,
318        ) -> anyhow::Result<()> {
319            let state = match self.state.as_mut() {
320                Some(x) => x,
321                None => {
322                    self.state = Some(WindowedCandle::new(now, self.window, candle));
323                    self.state.as_mut().unwrap()
324                }
325            };
326            if let Some((start, old_candle)) = state.with_candle(now, candle) {
327                self.write_candle(dbtx, start, old_candle).await?;
328            };
329            Ok(())
330        }
331
332        pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333            let state = std::mem::replace(&mut self.state, None);
334            if let Some(state) = state {
335                let (start, candle) = state.flush();
336                self.write_candle(dbtx, start, candle).await?;
337            }
338            Ok(())
339        }
340    }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346    use cometindex::PgTransaction;
347    use penumbra_sdk_asset::asset;
348
349    use super::{Candle, DateTime, PairMetrics, Window};
350
351    pub struct Context {
352        start: asset::Id,
353        end: asset::Id,
354        price: f64,
355        liquidity: f64,
356        start_price_indexing_denom: f64,
357    }
358
359    impl Context {
360        pub async fn load(
361            dbtx: &mut PgTransaction<'_>,
362            start: asset::Id,
363            end: asset::Id,
364        ) -> anyhow::Result<Self> {
365            let row: Option<(f64, f64, f64)> = sqlx::query_as(
366                "
367                SELECT price, liquidity, start_price_indexing_denom
368                FROM dex_ex_pairs_block_snapshot
369                WHERE asset_start = $1
370                AND asset_end = $2
371                ORDER BY id DESC
372                LIMIT 1
373            ",
374            )
375            .bind(start.to_bytes())
376            .bind(end.to_bytes())
377            .fetch_optional(dbtx.as_mut())
378            .await?;
379            let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380            Ok(Self {
381                start,
382                end,
383                price,
384                liquidity,
385                start_price_indexing_denom,
386            })
387        }
388
389        pub async fn update(
390            &mut self,
391            dbtx: &mut PgTransaction<'_>,
392            now: DateTime,
393            candle: Option<Candle>,
394            metrics: PairMetrics,
395            start_price_indexing_denom: Option<f64>,
396        ) -> anyhow::Result<()> {
397            if let Some(candle) = candle {
398                self.price = candle.close;
399            }
400            if let Some(price) = start_price_indexing_denom {
401                self.start_price_indexing_denom = price;
402            }
403            self.liquidity += metrics.liquidity_change;
404
405            sqlx::query(
406                "
407            INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409            )        
410        ",
411            )
412            .bind(now)
413            .bind(self.start.to_bytes())
414            .bind(self.end.to_bytes())
415            .bind(self.price)
416            .bind(self.liquidity)
417            .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418            .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419            .bind(self.start_price_indexing_denom)
420            .bind(metrics.trades)
421            .execute(dbtx.as_mut())
422            .await?;
423            Ok(())
424        }
425    }
426
427    pub async fn update_summaries(
428        dbtx: &mut PgTransaction<'_>,
429        now: DateTime,
430        window: Window,
431    ) -> anyhow::Result<()> {
432        let then = window.subtract_from(now);
433        let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434            "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435        )
436        .fetch_all(dbtx.as_mut())
437        .await?;
438        for (start, end) in pairs {
439            sqlx::query(
440                "
441            WITH
442            snapshots AS (
443                SELECT *
444                FROM dex_ex_pairs_block_snapshot
445                WHERE asset_start = $1
446                AND asset_end = $2
447            ),
448            previous AS (
449                SELECT price AS price_then, liquidity AS liquidity_then
450                FROM snapshots
451                WHERE time <= $4
452                ORDER BY time DESC
453                LIMIT 1
454            ),
455            previous_or_default AS (
456                SELECT
457                    COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458                    COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459            ),
460            now AS (
461                SELECT price, liquidity            
462                FROM snapshots
463                WHERE time <= $3
464                ORDER BY time DESC
465                LIMIT 1
466            ),
467            sums AS (
468                SELECT
469                    COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470                    COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473                    COALESCE(SUM(trades), 0.0) AS trades_over_window,
474                    COALESCE(MIN(price), 0.0) AS low,
475                    COALESCE(MAX(price), 0.0) AS high
476                FROM snapshots
477                WHERE time <= $3
478                AND time >= $4
479            )
480            INSERT INTO dex_ex_pairs_summary
481            SELECT 
482                $1, $2, $5,
483                price, price_then,
484                low, high,
485                liquidity, liquidity_then,
486                direct_volume_over_window,
487                swap_volume_over_window,
488                direct_volume_indexing_denom_over_window,
489                swap_volume_indexing_denom_over_window,
490                trades_over_window
491            FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492            ON CONFLICT (asset_start, asset_end, the_window)
493            DO UPDATE SET
494                price = EXCLUDED.price,
495                price_then = EXCLUDED.price_then,
496                liquidity = EXCLUDED.liquidity,
497                liquidity_then = EXCLUDED.liquidity_then,
498                direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499                swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500                direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501                swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502                trades_over_window = EXCLUDED.trades_over_window
503            ",
504            )
505            .bind(start)
506            .bind(end)
507            .bind(now)
508            .bind(then)
509            .bind(window.to_string())
510            .execute(dbtx.as_mut())
511            .await?;
512        }
513        Ok(())
514    }
515
516    pub async fn update_aggregate_summary(
517        dbtx: &mut PgTransaction<'_>,
518        window: Window,
519        denom: asset::Id,
520        min_liquidity: f64,
521    ) -> anyhow::Result<()> {
522        // TODO: do something here
523        sqlx::query(
524            "
525        WITH       
526            eligible_denoms AS (
527                SELECT asset_start as asset, price
528                FROM dex_ex_pairs_summary
529                WHERE asset_end = $1
530                AND liquidity >= $2
531                UNION VALUES ($1, 1.0)
532            ),
533            converted_pairs_summary AS (
534                SELECT
535                    asset_start, asset_end,
536                    (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537                    liquidity * ed_end.price AS liquidity,
538                    direct_volume_indexing_denom_over_window AS dv,
539                    swap_volume_indexing_denom_over_window AS sv,
540                    trades_over_window as trades
541                FROM dex_ex_pairs_summary
542                JOIN eligible_denoms AS ed_end
543                ON ed_end.asset = asset_end
544                JOIN eligible_denoms AS ed_start
545                ON ed_start.asset = asset_start
546                WHERE the_window = $3
547            ),
548            sums AS (
549                SELECT
550                    SUM(dv) AS direct_volume,
551                    SUM(sv) AS swap_volume,
552                    SUM(trades) AS trades
553                FROM converted_pairs_summary WHERE asset_start < asset_end
554            ),
555            total_liquidity AS (
556                SELECT
557                    SUM(liquidity) AS liquidity
558                FROM converted_pairs_summary
559            ),
560            undirected_pairs_summary AS (
561              WITH pairs AS (
562                  SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563              )
564              SELECT
565                  a_start AS asset_start,
566                  a_end AS asset_end,
567                  (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568                  (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569              FROM pairs
570            ),
571            counts AS (
572              SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573            ),
574            largest_sv AS (
575                SELECT
576                    asset_start AS largest_sv_trading_pair_start,
577                    asset_end AS largest_sv_trading_pair_end,
578                    sv AS largest_sv_trading_pair_volume                    
579                FROM undirected_pairs_summary
580                ORDER BY sv DESC
581                LIMIT 1
582            ),
583            largest_dv AS (
584                SELECT
585                    asset_start AS largest_dv_trading_pair_start,
586                    asset_end AS largest_dv_trading_pair_end,
587                    dv AS largest_dv_trading_pair_volume                    
588                FROM undirected_pairs_summary
589                ORDER BY dv DESC
590                LIMIT 1
591            ),
592            top_price_mover AS (
593                SELECT
594                    asset_start AS top_price_mover_start,
595                    asset_end AS top_price_mover_end,
596                    price_change AS top_price_mover_change_percent
597                FROM converted_pairs_summary
598                ORDER BY price_change DESC
599                LIMIT 1
600            )
601        INSERT INTO dex_ex_aggregate_summary
602        SELECT
603            $3,
604            direct_volume, swap_volume, liquidity, trades, active_pairs,
605            largest_sv_trading_pair_start,
606            largest_sv_trading_pair_end,
607            largest_sv_trading_pair_volume,
608            largest_dv_trading_pair_start,
609            largest_dv_trading_pair_end,
610            largest_dv_trading_pair_volume,
611            top_price_mover_start,
612            top_price_mover_end,
613            top_price_mover_change_percent
614        FROM
615            sums
616        JOIN largest_sv ON TRUE
617        JOIN largest_dv ON TRUE
618        JOIN top_price_mover ON TRUE
619        JOIN total_liquidity ON TRUE
620        JOIN counts ON TRUE
621        ON CONFLICT (the_window) DO UPDATE SET
622            direct_volume = EXCLUDED.direct_volume,
623            swap_volume = EXCLUDED.swap_volume,
624            liquidity = EXCLUDED.liquidity,
625            trades = EXCLUDED.trades,
626            active_pairs = EXCLUDED.active_pairs,          
627            largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628            largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629            largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630            largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631            largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632            largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633            top_price_mover_start = EXCLUDED.top_price_mover_start,
634            top_price_mover_end = EXCLUDED.top_price_mover_end,
635            top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636        ")
637        .bind(denom.to_bytes())
638        .bind(min_liquidity)
639        .bind(window.to_string())
640        .execute(dbtx.as_mut())
641        .await?;
642        Ok(())
643    }
644}
645
646mod metadata {
647    use super::*;
648
649    pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650        sqlx::query(
651            "
652        INSERT INTO dex_ex_metadata
653        VALUES (1, $1)
654        ON CONFLICT (id) DO UPDATE 
655        SET id = EXCLUDED.id,
656            quote_asset_id = EXCLUDED.quote_asset_id
657        ",
658        )
659        .bind(quote_asset.to_bytes())
660        .execute(dbtx.as_mut())
661        .await?;
662        Ok(())
663    }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668    trades: f64,
669    liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674    asset_start: asset::Id,
675    asset_end: asset::Id,
676    input: Amount,
677    output: Amount,
678    num_swaps: i32,
679    price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684    time: Option<DateTime>,
685    height: i32,
686    candles: HashMap<DirectedTradingPair, Candle>,
687    metrics: HashMap<DirectedTradingPair, PairMetrics>,
688    // Relevant positions.
689    positions: BTreeMap<PositionId, Position>,
690    // Store events
691    position_opens: Vec<EventPositionOpen>,
692    position_executions: Vec<EventPositionExecution>,
693    position_closes: Vec<EventPositionClose>,
694    position_withdrawals: Vec<EventPositionWithdraw>,
695    batch_swaps: Vec<EventBatchSwap>,
696    swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697    swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698    // Track transaction hashes by position ID
699    position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700    position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701    position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705    fn new() -> Self {
706        Self {
707            time: None,
708            height: 0,
709            candles: HashMap::new(),
710            metrics: HashMap::new(),
711            positions: BTreeMap::new(),
712            position_opens: Vec::new(),
713            position_executions: Vec::new(),
714            position_closes: Vec::new(),
715            position_withdrawals: Vec::new(),
716            batch_swaps: Vec::new(),
717            swaps: BTreeMap::new(),
718            swap_claims: BTreeMap::new(),
719            position_open_txs: BTreeMap::new(),
720            position_close_txs: BTreeMap::new(),
721            position_withdrawal_txs: BTreeMap::new(),
722        }
723    }
724
725    fn with_time(&mut self, time: DateTime) {
726        self.time = Some(time)
727    }
728
729    fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730        if !self.metrics.contains_key(pair) {
731            self.metrics.insert(*pair, PairMetrics::default());
732        }
733        // NOPANIC: inserted above.
734        self.metrics.get_mut(pair).unwrap()
735    }
736
737    fn with_trace(&mut self, input: Value, output: Value) {
738        let pair_1_2 = DirectedTradingPair {
739            start: input.asset_id,
740            end: output.asset_id,
741        };
742        // This shouldn't happen, but to avoid weird stuff later, let's ignore this trace.
743        if pair_1_2.start == pair_1_2.end {
744            tracing::warn!(?input, ?output, "found trace with a loop?");
745            return;
746        }
747        let pair_2_1 = pair_1_2.flip();
748        let input_amount = f64::from(input.amount);
749        let output_amount = f64::from(output.amount);
750        let price_1_2 = output_amount / input_amount;
751        let candle_1_2 = Candle::point(price_1_2, input_amount);
752        let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
753        self.metric(&pair_1_2).trades += 1.0;
754        self.candles
755            .entry(pair_1_2)
756            .and_modify(|c| c.merge(&candle_1_2))
757            .or_insert(candle_1_2);
758        self.metric(&pair_2_1).trades += 1.0;
759        self.candles
760            .entry(pair_2_1)
761            .and_modify(|c| c.merge(&candle_2_1))
762            .or_insert(candle_2_1);
763    }
764
765    fn with_swap_execution(&mut self, se: &SwapExecution) {
766        for row in se.traces.iter() {
767            for window in row.windows(2) {
768                self.with_trace(window[0], window[1]);
769            }
770        }
771        let pair_1_2 = DirectedTradingPair {
772            start: se.input.asset_id,
773            end: se.output.asset_id,
774        };
775        // When doing arb, we don't want to report the volume on UM -> UM,
776        // so we need this check.
777        if pair_1_2.start == pair_1_2.end {
778            return;
779        }
780        let pair_2_1 = pair_1_2.flip();
781        self.candles
782            .entry(pair_1_2)
783            .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
784        self.candles
785            .entry(pair_2_1)
786            .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
787    }
788
789    fn with_reserve_change(
790        &mut self,
791        pair: &TradingPair,
792        old_reserves: Option<Reserves>,
793        new_reserves: Reserves,
794        removed: bool,
795    ) {
796        let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
797            (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
798            (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
799            (_, Some(old), new) => (
800                (new.r1.value() as f64) - (old.r1.value() as f64),
801                (new.r2.value() as f64) - (old.r2.value() as f64),
802            ),
803        };
804        for (d_pair, diff) in [
805            (
806                DirectedTradingPair {
807                    start: pair.asset_1(),
808                    end: pair.asset_2(),
809                },
810                diff_2,
811            ),
812            (
813                DirectedTradingPair {
814                    start: pair.asset_2(),
815                    end: pair.asset_1(),
816                },
817                diff_1,
818            ),
819        ] {
820            self.metric(&d_pair).liquidity_change += diff;
821        }
822    }
823
824    pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
825        let mut out = Self::new();
826        out.height = block.height() as i32;
827
828        for event in block.events() {
829            if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
830                let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
831                    "creating timestamp should succeed; timestamp: {}",
832                    e.timestamp_seconds
833                ))?;
834                out.with_time(time);
835            } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
836                out.with_reserve_change(
837                    &e.trading_pair,
838                    None,
839                    Reserves {
840                        r1: e.reserves_1,
841                        r2: e.reserves_2,
842                    },
843                    false,
844                );
845                if let Some(tx_hash) = event.tx_hash() {
846                    out.position_open_txs.insert(e.position_id, tx_hash);
847                }
848                // A newly opened position might be executed against in this block,
849                // but wouldn't already be in the database. Adding it here ensures
850                // it's available.
851                out.positions.insert(e.position_id, e.position.clone());
852                out.position_opens.push(e);
853            } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
854                // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few
855                // positions to close with being withdrawn.
856                out.with_reserve_change(
857                    &e.trading_pair,
858                    None,
859                    Reserves {
860                        r1: e.reserves_1,
861                        r2: e.reserves_2,
862                    },
863                    true,
864                );
865                if let Some(tx_hash) = event.tx_hash() {
866                    out.position_withdrawal_txs.insert(e.position_id, tx_hash);
867                }
868                out.position_withdrawals.push(e);
869            } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
870                out.with_reserve_change(
871                    &e.trading_pair,
872                    Some(Reserves {
873                        r1: e.prev_reserves_1,
874                        r2: e.prev_reserves_2,
875                    }),
876                    Reserves {
877                        r1: e.reserves_1,
878                        r2: e.reserves_2,
879                    },
880                    false,
881                );
882                out.position_executions.push(e);
883            } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
884                out.position_closes.push(e);
885            } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
886                // The position close event is emitted by the dex module at EOB,
887                // so we need to track it with the tx hash of the closure tx.
888                if let Some(tx_hash) = event.tx_hash() {
889                    out.position_close_txs.insert(e.position_id, tx_hash);
890                }
891            } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
892                out.swaps
893                    .entry(e.trading_pair)
894                    .or_insert_with(Vec::new)
895                    .push(e);
896            } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
897                out.swap_claims
898                    .entry(e.trading_pair)
899                    .or_insert_with(Vec::new)
900                    .push(e);
901            } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
902                let pair = DirectedTradingPair {
903                    start: e.incentivized_asset_id,
904                    end: *STAKING_TOKEN_ASSET_ID,
905                };
906                out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
907            } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
908                // NOTE: order matters here, 2 for 1 happened after.
909                if let Some(se) = e.swap_execution_1_for_2.as_ref() {
910                    out.with_swap_execution(se);
911                }
912                if let Some(se) = e.swap_execution_2_for_1.as_ref() {
913                    out.with_swap_execution(se);
914                }
915                out.batch_swaps.push(e);
916            } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
917                out.with_swap_execution(&e.swap_execution);
918            }
919        }
920        Ok(out)
921    }
922
923    async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
924        // Collect position IDs that we need but don't already have
925        let missing_positions: Vec<_> = self
926            .position_executions
927            .iter()
928            .map(|e| e.position_id)
929            .filter(|id| !self.positions.contains_key(id))
930            .collect();
931
932        if missing_positions.is_empty() {
933            return Ok(());
934        }
935
936        // Load missing positions from database
937        let rows = sqlx::query(
938            "SELECT position_raw 
939             FROM dex_ex_position_state 
940             WHERE position_id = ANY($1)",
941        )
942        .bind(
943            &missing_positions
944                .iter()
945                .map(|id| id.0.as_ref())
946                .collect::<Vec<_>>(),
947        )
948        .fetch_all(dbtx.as_mut())
949        .await?;
950
951        // Decode and store each position
952        for row in rows {
953            let position_raw: Vec<u8> = row.get("position_raw");
954            let position = Position::decode(position_raw.as_slice())?;
955            self.positions.insert(position.id(), position);
956        }
957
958        Ok(())
959    }
960
961    /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
962    pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
963        self.candles
964            .get(&DirectedTradingPair::new(asset, indexing_denom))
965            .map(|x| x.close)
966    }
967}
968
969#[derive(Debug)]
970pub struct Component {
971    denom: asset::Id,
972    min_liquidity: f64,
973}
974
975impl Component {
976    pub fn new(denom: asset::Id, min_liquidity: f64) -> Self {
977        Self {
978            denom,
979            min_liquidity,
980        }
981    }
982
983    async fn record_position_open(
984        &self,
985        dbtx: &mut PgTransaction<'_>,
986        time: DateTime,
987        height: i32,
988        tx_hash: Option<[u8; 32]>,
989        event: &EventPositionOpen,
990    ) -> anyhow::Result<()> {
991        // Get effective prices by orienting the trading function in each direction
992        let effective_price_1_to_2: f64 = event
993            .position
994            .phi
995            .orient_start(event.trading_pair.asset_1())
996            .expect("position trading pair matches")
997            .effective_price()
998            .into();
999
1000        let effective_price_2_to_1: f64 = event
1001            .position
1002            .phi
1003            .orient_start(event.trading_pair.asset_2())
1004            .expect("position trading pair matches")
1005            .effective_price()
1006            .into();
1007
1008        // First insert initial reserves and get the rowid
1009        let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1010            "INSERT INTO dex_ex_position_reserves (
1011                position_id,
1012                height,
1013                time,
1014                reserves_1,
1015                reserves_2
1016            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1017        )
1018        .bind(event.position_id.0)
1019        .bind(height)
1020        .bind(time)
1021        .bind(BigDecimal::from(event.reserves_1.value()))
1022        .bind(BigDecimal::from(event.reserves_2.value()))
1023        .fetch_one(dbtx.as_mut())
1024        .await?;
1025
1026        // Then insert position state with the opening_reserves_rowid
1027        sqlx::query(
1028            "INSERT INTO dex_ex_position_state (
1029                position_id,
1030                asset_1,
1031                asset_2,
1032                p,
1033                q,
1034                close_on_fill,
1035                fee_bps,
1036                effective_price_1_to_2,
1037                effective_price_2_to_1,
1038                position_raw,
1039                opening_time,
1040                opening_height,
1041                opening_tx,
1042                opening_reserves_rowid
1043            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1044        )
1045        .bind(event.position_id.0)
1046        .bind(event.trading_pair.asset_1().to_bytes())
1047        .bind(event.trading_pair.asset_2().to_bytes())
1048        .bind(BigDecimal::from(event.position.phi.component.p.value()))
1049        .bind(BigDecimal::from(event.position.phi.component.q.value()))
1050        .bind(event.position.close_on_fill)
1051        .bind(event.trading_fee as i32)
1052        .bind(effective_price_1_to_2)
1053        .bind(effective_price_2_to_1)
1054        .bind(event.position.encode_to_vec())
1055        .bind(time)
1056        .bind(height)
1057        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1058        .bind(opening_reserves_rowid)
1059        .execute(dbtx.as_mut())
1060        .await?;
1061
1062        Ok(())
1063    }
1064
1065    async fn record_swap_execution_traces(
1066        &self,
1067        dbtx: &mut PgTransaction<'_>,
1068        time: DateTime,
1069        height: i32,
1070        swap_execution: &SwapExecution,
1071    ) -> anyhow::Result<()> {
1072        let SwapExecution {
1073            traces,
1074            input: se_input,
1075            output: se_output,
1076        } = swap_execution;
1077
1078        let asset_start = se_input.asset_id;
1079        let asset_end = se_output.asset_id;
1080        let batch_input = se_input.amount;
1081        let batch_output = se_output.amount;
1082
1083        for trace in traces.iter() {
1084            let Some(input_value) = trace.first() else {
1085                continue;
1086            };
1087            let Some(output_value) = trace.last() else {
1088                continue;
1089            };
1090
1091            let input = input_value.amount;
1092            let output = output_value.amount;
1093
1094            let price_float = (output.value() as f64) / (input.value() as f64);
1095            let amount_hops = trace
1096                .iter()
1097                .map(|x| BigDecimal::from(x.amount.value()))
1098                .collect::<Vec<_>>();
1099            let position_id_hops: Vec<[u8; 32]> = vec![];
1100            let asset_hops = trace
1101                .iter()
1102                .map(|x| x.asset_id.to_bytes())
1103                .collect::<Vec<_>>();
1104
1105            sqlx::query(
1106                "INSERT INTO dex_ex_batch_swap_traces (
1107                height,
1108                time,
1109                input,
1110                output,
1111                batch_input,
1112                batch_output,
1113                price_float,
1114                asset_start,
1115                asset_end,
1116                asset_hops,
1117                amount_hops,
1118               position_id_hops
1119            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1120            )
1121            .bind(height)
1122            .bind(time)
1123            .bind(BigDecimal::from(input.value()))
1124            .bind(BigDecimal::from(output.value()))
1125            .bind(BigDecimal::from(batch_input.value()))
1126            .bind(BigDecimal::from(batch_output.value()))
1127            .bind(price_float)
1128            .bind(asset_start.to_bytes())
1129            .bind(asset_end.to_bytes())
1130            .bind(asset_hops)
1131            .bind(amount_hops)
1132            .bind(position_id_hops)
1133            .execute(dbtx.as_mut())
1134            .await?;
1135        }
1136
1137        Ok(())
1138    }
1139
1140    async fn record_block_summary(
1141        &self,
1142        dbtx: &mut PgTransaction<'_>,
1143        time: DateTime,
1144        height: i32,
1145        events: &Events,
1146    ) -> anyhow::Result<()> {
1147        let num_opened_lps = events.position_opens.len() as i32;
1148        let num_closed_lps = events.position_closes.len() as i32;
1149        let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1150        let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1151        let num_swap_claims = events
1152            .swap_claims
1153            .iter()
1154            .map(|(_, v)| v.len())
1155            .sum::<usize>() as i32;
1156        let num_txs = events.batch_swaps.len() as i32;
1157
1158        let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1159
1160        for event in &events.batch_swaps {
1161            let trading_pair = event.batch_swap_output_data.trading_pair;
1162
1163            if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1164                let asset_start = swap_1_2.input.asset_id;
1165                let asset_end = swap_1_2.output.asset_id;
1166                let input = swap_1_2.input.amount;
1167                let output = swap_1_2.output.amount;
1168                let price_float = (output.value() as f64) / (input.value() as f64);
1169
1170                let empty_vec = vec![];
1171                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1172                let filtered_swaps: Vec<_> = swaps_for_pair
1173                    .iter()
1174                    .filter(|swap| swap.delta_1_i != Amount::zero())
1175                    .collect::<Vec<_>>();
1176                let num_swaps = filtered_swaps.len() as i32;
1177
1178                batch_swap_summaries.push(BatchSwapSummary {
1179                    asset_start,
1180                    asset_end,
1181                    input,
1182                    output,
1183                    num_swaps,
1184                    price_float,
1185                });
1186            }
1187
1188            if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1189                let asset_start = swap_2_1.input.asset_id;
1190                let asset_end = swap_2_1.output.asset_id;
1191                let input = swap_2_1.input.amount;
1192                let output = swap_2_1.output.amount;
1193                let price_float = (output.value() as f64) / (input.value() as f64);
1194
1195                let empty_vec = vec![];
1196                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1197                let filtered_swaps: Vec<_> = swaps_for_pair
1198                    .iter()
1199                    .filter(|swap| swap.delta_2_i != Amount::zero())
1200                    .collect::<Vec<_>>();
1201                let num_swaps = filtered_swaps.len() as i32;
1202
1203                batch_swap_summaries.push(BatchSwapSummary {
1204                    asset_start,
1205                    asset_end,
1206                    input,
1207                    output,
1208                    num_swaps,
1209                    price_float,
1210                });
1211            }
1212        }
1213
1214        sqlx::query(
1215            "INSERT INTO dex_ex_block_summary (
1216            height,
1217            time,
1218            batch_swaps,
1219            num_open_lps,
1220            num_closed_lps,
1221            num_withdrawn_lps,
1222            num_swaps,
1223            num_swap_claims,
1224            num_txs
1225        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1226        )
1227        .bind(height)
1228        .bind(time)
1229        .bind(serde_json::to_value(&batch_swap_summaries)?)
1230        .bind(num_opened_lps)
1231        .bind(num_closed_lps)
1232        .bind(num_withdrawn_lps)
1233        .bind(num_swaps)
1234        .bind(num_swap_claims)
1235        .bind(num_txs)
1236        .execute(dbtx.as_mut())
1237        .await?;
1238
1239        Ok(())
1240    }
1241
1242    async fn record_batch_swap_traces(
1243        &self,
1244        dbtx: &mut PgTransaction<'_>,
1245        time: DateTime,
1246        height: i32,
1247        event: &EventBatchSwap,
1248    ) -> anyhow::Result<()> {
1249        let EventBatchSwap {
1250            batch_swap_output_data: _,
1251            swap_execution_1_for_2,
1252            swap_execution_2_for_1,
1253        } = event;
1254
1255        if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1256            self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1257                .await?;
1258        }
1259
1260        if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1261            self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1262                .await?;
1263        }
1264
1265        Ok(())
1266    }
1267
1268    async fn record_position_execution(
1269        &self,
1270        dbtx: &mut PgTransaction<'_>,
1271        time: DateTime,
1272        height: i32,
1273        event: &EventPositionExecution,
1274        positions: &BTreeMap<PositionId, Position>,
1275    ) -> anyhow::Result<()> {
1276        // Get the position that was executed against
1277        let position = positions
1278            .get(&event.position_id)
1279            .expect("position must exist for execution");
1280        let current = Reserves {
1281            r1: event.reserves_1,
1282            r2: event.reserves_2,
1283        };
1284        let prev = Reserves {
1285            r1: event.prev_reserves_1,
1286            r2: event.prev_reserves_2,
1287        };
1288        let flows = Flows::from_phi_and_reserves(&position.phi, &current, &prev);
1289
1290        // First insert the reserves and get the rowid
1291        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1292            "INSERT INTO dex_ex_position_reserves (
1293                position_id,
1294                height,
1295                time,
1296                reserves_1,
1297                reserves_2
1298            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1299        )
1300        .bind(event.position_id.0)
1301        .bind(height)
1302        .bind(time)
1303        .bind(BigDecimal::from(event.reserves_1.value()))
1304        .bind(BigDecimal::from(event.reserves_2.value()))
1305        .fetch_one(dbtx.as_mut())
1306        .await?;
1307
1308        // Then record the execution with the reserves_rowid
1309        sqlx::query(
1310            "INSERT INTO dex_ex_position_executions (
1311                position_id,
1312                height,
1313                time,
1314                reserves_rowid,
1315                delta_1,
1316                delta_2,
1317                lambda_1,
1318                lambda_2,
1319                fee_1,
1320                fee_2,
1321                context_asset_start,
1322                context_asset_end
1323            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1324        )
1325        .bind(event.position_id.0)
1326        .bind(height)
1327        .bind(time)
1328        .bind(reserves_rowid)
1329        .bind(BigDecimal::from(flows.delta_1().value()))
1330        .bind(BigDecimal::from(flows.delta_2().value()))
1331        .bind(BigDecimal::from(flows.lambda_1().value()))
1332        .bind(BigDecimal::from(flows.lambda_2().value()))
1333        .bind(BigDecimal::from(flows.fee_1().value()))
1334        .bind(BigDecimal::from(flows.fee_2().value()))
1335        .bind(event.context.start.to_bytes())
1336        .bind(event.context.end.to_bytes())
1337        .execute(dbtx.as_mut())
1338        .await?;
1339
1340        Ok(())
1341    }
1342
1343    async fn record_position_close(
1344        &self,
1345        dbtx: &mut PgTransaction<'_>,
1346        time: DateTime,
1347        height: i32,
1348        tx_hash: Option<[u8; 32]>,
1349        event: &EventPositionClose,
1350    ) -> anyhow::Result<()> {
1351        sqlx::query(
1352            "UPDATE dex_ex_position_state 
1353             SET closing_time = $1,
1354                 closing_height = $2,
1355                 closing_tx = $3
1356             WHERE position_id = $4",
1357        )
1358        .bind(time)
1359        .bind(height)
1360        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1361        .bind(event.position_id.0)
1362        .execute(dbtx.as_mut())
1363        .await?;
1364
1365        Ok(())
1366    }
1367
1368    async fn record_position_withdraw(
1369        &self,
1370        dbtx: &mut PgTransaction<'_>,
1371        time: DateTime,
1372        height: i32,
1373        tx_hash: Option<[u8; 32]>,
1374        event: &EventPositionWithdraw,
1375    ) -> anyhow::Result<()> {
1376        // First insert the final reserves state (zeros after withdrawal)
1377        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1378            "INSERT INTO dex_ex_position_reserves (
1379                position_id,
1380                height,
1381                time,
1382                reserves_1,
1383                reserves_2
1384            ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values
1385        )
1386        .bind(event.position_id.0)
1387        .bind(height)
1388        .bind(time)
1389        .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal
1390        .fetch_one(dbtx.as_mut())
1391        .await?;
1392
1393        sqlx::query(
1394            "INSERT INTO dex_ex_position_withdrawals (
1395                position_id,
1396                height,
1397                time,
1398                withdrawal_tx,
1399                sequence,
1400                reserves_1,
1401                reserves_2,
1402                reserves_rowid
1403            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1404        )
1405        .bind(event.position_id.0)
1406        .bind(height)
1407        .bind(time)
1408        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1409        .bind(event.sequence as i32)
1410        .bind(BigDecimal::from(event.reserves_1.value()))
1411        .bind(BigDecimal::from(event.reserves_2.value()))
1412        .bind(reserves_rowid)
1413        .execute(dbtx.as_mut())
1414        .await?;
1415
1416        Ok(())
1417    }
1418
1419    async fn record_transaction(
1420        &self,
1421        dbtx: &mut PgTransaction<'_>,
1422        time: DateTime,
1423        height: u64,
1424        transaction_id: [u8; 32],
1425        transaction: Transaction,
1426    ) -> anyhow::Result<()> {
1427        if transaction.transaction_body.actions.is_empty() {
1428            return Ok(());
1429        }
1430        sqlx::query(
1431            "INSERT INTO dex_ex_transactions (
1432                transaction_id,
1433                transaction,
1434                height,
1435                time
1436            ) VALUES ($1, $2, $3, $4)
1437            ",
1438        )
1439        .bind(transaction_id)
1440        .bind(transaction.encode_to_vec())
1441        .bind(i32::try_from(height)?)
1442        .bind(time)
1443        .execute(dbtx.as_mut())
1444        .await?;
1445
1446        Ok(())
1447    }
1448
1449    async fn record_all_transactions(
1450        &self,
1451        dbtx: &mut PgTransaction<'_>,
1452        time: DateTime,
1453        block: &BlockEvents,
1454    ) -> anyhow::Result<()> {
1455        for (tx_id, tx_bytes) in block.transactions() {
1456            let tx = Transaction::try_from(tx_bytes)?;
1457            let height = block.height();
1458            self.record_transaction(dbtx, time, height, tx_id, tx)
1459                .await?;
1460        }
1461        Ok(())
1462    }
1463}
1464
1465#[async_trait]
1466impl AppView for Component {
1467    async fn init_chain(
1468        &self,
1469        _dbtx: &mut PgTransaction,
1470        _: &serde_json::Value,
1471    ) -> Result<(), anyhow::Error> {
1472        Ok(())
1473    }
1474
1475    fn name(&self) -> String {
1476        "dex_ex".to_string()
1477    }
1478
1479    fn version(&self) -> Version {
1480        Version::with_major(1)
1481    }
1482
1483    async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1484        for statement in include_str!("reset.sql").split(";") {
1485            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1486        }
1487        Ok(())
1488    }
1489
1490    async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1491        for statement in include_str!("schema.sql").split(";") {
1492            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1493        }
1494        Ok(())
1495    }
1496
1497    async fn index_batch(
1498        &self,
1499        dbtx: &mut PgTransaction,
1500        batch: EventBatch,
1501        ctx: EventBatchContext,
1502    ) -> Result<(), anyhow::Error> {
1503        metadata::set(dbtx, self.denom).await?;
1504        let mut charts = HashMap::new();
1505        let mut snapshots = HashMap::new();
1506        let mut last_time = None;
1507        for block in batch.events_by_block() {
1508            let mut events = Events::extract(&block)?;
1509            let time = events
1510                .time
1511                .expect(&format!("no block root event at height {}", block.height()));
1512            last_time = Some(time);
1513
1514            self.record_all_transactions(dbtx, time, block).await?;
1515
1516            // Load any missing positions before processing events
1517            events.load_positions(dbtx).await?;
1518
1519            // This is where we are going to build the block summary for the DEX.
1520            self.record_block_summary(dbtx, time, block.height() as i32, &events)
1521                .await?;
1522
1523            // Record batch swap execution traces.
1524            for event in &events.batch_swaps {
1525                self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1526                    .await?;
1527            }
1528
1529            // Record position opens
1530            for event in &events.position_opens {
1531                let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1532                self.record_position_open(dbtx, time, events.height, tx_hash, event)
1533                    .await?;
1534            }
1535
1536            // Process position executions
1537            for event in &events.position_executions {
1538                self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1539                    .await?;
1540            }
1541
1542            // Record position closes
1543            for event in &events.position_closes {
1544                let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1545                self.record_position_close(dbtx, time, events.height, tx_hash, event)
1546                    .await?;
1547            }
1548
1549            // Record position withdrawals
1550            for event in &events.position_withdrawals {
1551                let tx_hash = events
1552                    .position_withdrawal_txs
1553                    .get(&event.position_id)
1554                    .copied();
1555                self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1556                    .await?;
1557            }
1558
1559            for (pair, candle) in &events.candles {
1560                sqlx::query(
1561                    "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1562                )
1563                .bind(i64::try_from(block.height())?)
1564                .bind(time)
1565                .bind(pair.start.to_bytes())
1566                .bind(pair.end.to_bytes())
1567                .bind(candle.open)
1568                .bind(candle.close)
1569                .bind(candle.low)
1570                .bind(candle.high)
1571                .bind(candle.direct_volume)
1572                .bind(candle.swap_volume)
1573                .execute(dbtx.as_mut())
1574                .await?;
1575                for window in Window::all() {
1576                    let key = (pair.start, pair.end, window);
1577                    if !charts.contains_key(&key) {
1578                        let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1579                        charts.insert(key, ctx);
1580                    }
1581                    charts
1582                        .get_mut(&key)
1583                        .unwrap() // safe because we just inserted above
1584                        .update(dbtx, time, *candle)
1585                        .await?;
1586                }
1587            }
1588
1589            let block_pairs = events
1590                .candles
1591                .keys()
1592                .chain(events.metrics.keys())
1593                .copied()
1594                .collect::<HashSet<_>>();
1595            for pair in block_pairs {
1596                if !snapshots.contains_key(&pair) {
1597                    let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1598                    snapshots.insert(pair, ctx);
1599                }
1600                // NOPANIC: inserted above
1601                snapshots
1602                    .get_mut(&pair)
1603                    .unwrap()
1604                    .update(
1605                        dbtx,
1606                        time,
1607                        events.candles.get(&pair).copied(),
1608                        events.metrics.get(&pair).copied().unwrap_or_default(),
1609                        events.price_for(self.denom, pair.start),
1610                    )
1611                    .await?;
1612            }
1613        }
1614
1615        if ctx.is_last() {
1616            if let Some(now) = last_time {
1617                for window in Window::all() {
1618                    summary::update_summaries(dbtx, now, window).await?;
1619                    summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1620                        .await?;
1621                }
1622            }
1623        }
1624        for chart in charts.into_values() {
1625            chart.unload(dbtx).await?;
1626        }
1627        Ok(())
1628    }
1629}