pindexer/dex_ex/
mod.rs

1use anyhow::anyhow;
2use clap::Result;
3use cometindex::{
4    async_trait,
5    index::{BlockEvents, EventBatch, EventBatchContext, Version},
6    AppView, PgTransaction,
7};
8use penumbra_sdk_asset::{asset, Value, STAKING_TOKEN_ASSET_ID};
9use penumbra_sdk_dex::{
10    event::{
11        EventArbExecution, EventBatchSwap, EventPositionClose, EventPositionExecution,
12        EventPositionOpen, EventPositionWithdraw, EventQueuePositionClose,
13    },
14    lp::{position::Flows, Reserves},
15    DirectedTradingPair, SwapExecution, TradingPair,
16};
17use penumbra_sdk_dex::{
18    event::{EventSwap, EventSwapClaim},
19    lp::position::{Id as PositionId, Position},
20};
21use penumbra_sdk_funding::event::EventLqtPositionReward;
22use penumbra_sdk_num::Amount;
23use penumbra_sdk_proto::event::EventDomainType;
24use penumbra_sdk_proto::DomainType;
25use penumbra_sdk_sct::event::EventBlockRoot;
26use penumbra_sdk_transaction::Transaction;
27use sqlx::types::BigDecimal;
28use sqlx::Row;
29use std::collections::{BTreeMap, HashMap, HashSet};
30
31type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;
32
33mod candle {
34    use super::DateTime;
35    use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc};
36    use penumbra_sdk_dex::CandlestickData;
37    use std::fmt::Display;
38
39    /// Candlestick data, unmoored from the prison of a particular block height.
40    ///
41    /// In other words, this can represent candlesticks which span arbitrary windows,
42    /// and not just a single block.
43    #[derive(Debug, Clone, Copy)]
44    pub struct Candle {
45        pub open: f64,
46        pub close: f64,
47        pub low: f64,
48        pub high: f64,
49        pub direct_volume: f64,
50        pub swap_volume: f64,
51    }
52
53    impl Candle {
54        pub fn from_candlestick_data(data: &CandlestickData) -> Self {
55            // The volume is tracked in terms of input asset.
56            // We can use the closing price (aka. the common clearing price) to convert
57            // the volume to the other direction i.e, the batch swap output.
58            Self {
59                open: data.open,
60                close: data.close,
61                low: data.low,
62                high: data.high,
63                direct_volume: data.direct_volume,
64                swap_volume: data.swap_volume,
65            }
66        }
67
68        pub fn point(price: f64, direct_volume: f64) -> Self {
69            Self {
70                open: price,
71                close: price,
72                low: price,
73                high: price,
74                direct_volume,
75                swap_volume: 0.0,
76            }
77        }
78
79        pub fn merge(&mut self, that: &Self) {
80            self.close = that.close;
81            self.low = self.low.min(that.low);
82            self.high = self.high.max(that.high);
83            self.direct_volume += that.direct_volume;
84            self.swap_volume += that.swap_volume;
85        }
86    }
87
88    #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
89    pub enum Window {
90        W1m,
91        W15m,
92        W1h,
93        W4h,
94        W1d,
95        W1w,
96        W1mo,
97    }
98
99    impl Window {
100        pub fn all() -> impl Iterator<Item = Self> {
101            [
102                Window::W1m,
103                Window::W15m,
104                Window::W1h,
105                Window::W4h,
106                Window::W1d,
107                Window::W1w,
108                Window::W1mo,
109            ]
110            .into_iter()
111        }
112
113        /// Get the anchor for a given time.
114        ///
115        /// This is the latest time that "snaps" to a given anchor, dependent on the window.
116        ///
117        /// For example, the 1 minute window has an anchor every minute, the day window
118        /// every day, etc.
119        pub fn anchor(&self, time: DateTime) -> DateTime {
120            let (y, mo, d, h, m) = (
121                time.year(),
122                time.month(),
123                time.day(),
124                time.hour(),
125                time.minute(),
126            );
127            let out = match self {
128                Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(),
129                Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(),
130                Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(),
131                Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(),
132                Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(),
133                Window::W1w => Utc
134                    .with_ymd_and_hms(y, mo, d, 0, 0, 0)
135                    .single()
136                    .and_then(|x| {
137                        x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into()))
138                    }),
139                Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(),
140            };
141            out.unwrap()
142        }
143
144        pub fn subtract_from(&self, time: DateTime) -> DateTime {
145            let delta = match self {
146                Window::W1m => TimeDelta::minutes(1),
147                Window::W15m => TimeDelta::minutes(15),
148                Window::W1h => TimeDelta::hours(1),
149                Window::W4h => TimeDelta::hours(4),
150                Window::W1d => TimeDelta::days(1),
151                Window::W1w => TimeDelta::weeks(1),
152                Window::W1mo => TimeDelta::days(30),
153            };
154            time - delta
155        }
156    }
157
158    impl Display for Window {
159        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160            use Window::*;
161            let str = match self {
162                W1m => "1m",
163                W15m => "15m",
164                W1h => "1h",
165                W4h => "4h",
166                W1d => "1d",
167                W1w => "1w",
168                W1mo => "1mo",
169            };
170            write!(f, "{}", str)
171        }
172    }
173
174    #[derive(Debug)]
175    pub struct WindowedCandle {
176        start: DateTime,
177        window: Window,
178        candle: Candle,
179    }
180
181    impl WindowedCandle {
182        pub fn new(now: DateTime, window: Window, candle: Candle) -> Self {
183            Self {
184                start: window.anchor(now),
185                window,
186                candle,
187            }
188        }
189
190        /// Update with a new candlestick, at a given time.
191        ///
192        /// This may return the old candlestick and its start time, if we should be starting
193        /// a new candle, based on the candle for that window having already been closed.
194        pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> {
195            let start = self.window.anchor(now);
196            // This candle belongs to the next window!
197            if start > self.start {
198                let old_start = std::mem::replace(&mut self.start, start);
199                let old_candle = std::mem::replace(&mut self.candle, candle);
200                Some((old_start, old_candle))
201            } else {
202                self.candle.merge(&candle);
203                None
204            }
205        }
206
207        pub fn window(&self) -> Window {
208            self.window
209        }
210
211        pub fn flush(self) -> (DateTime, Candle) {
212            (self.start, self.candle)
213        }
214    }
215}
216pub use candle::{Candle, Window, WindowedCandle};
217
218mod price_chart {
219    use super::*;
220
221    /// A context when processing a price chart.
222    #[derive(Debug)]
223    pub struct Context {
224        asset_start: asset::Id,
225        asset_end: asset::Id,
226        window: Window,
227        state: Option<WindowedCandle>,
228    }
229
230    impl Context {
231        pub async fn load(
232            dbtx: &mut PgTransaction<'_>,
233            asset_start: asset::Id,
234            asset_end: asset::Id,
235            window: Window,
236        ) -> anyhow::Result<Self> {
237            let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as(
238                "
239                SELECT open, close, high, low, direct_volume, swap_volume, start_time
240                FROM dex_ex_price_charts
241                WHERE asset_start = $1
242                AND asset_end = $2
243                AND the_window = $3
244                ORDER BY start_time DESC
245                LIMIT 1
246            ",
247            )
248            .bind(asset_start.to_bytes())
249            .bind(asset_end.to_bytes())
250            .bind(window.to_string())
251            .fetch_optional(dbtx.as_mut())
252            .await?;
253            let state = row.map(
254                |(open, close, high, low, direct_volume, swap_volume, start)| {
255                    let candle = Candle {
256                        open,
257                        close,
258                        low,
259                        high,
260                        direct_volume,
261                        swap_volume,
262                    };
263                    WindowedCandle::new(start, window, candle)
264                },
265            );
266            Ok(Self {
267                asset_start,
268                asset_end,
269                window,
270                state,
271            })
272        }
273
274        async fn write_candle(
275            &self,
276            dbtx: &mut PgTransaction<'_>,
277            start: DateTime,
278            candle: Candle,
279        ) -> anyhow::Result<()> {
280            sqlx::query(
281                "
282            INSERT INTO dex_ex_price_charts(
283                id, asset_start, asset_end, the_window,
284                start_time, open, close, high, low, direct_volume, swap_volume
285            ) 
286            VALUES(
287                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10
288            )
289            ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET
290                open = EXCLUDED.open,
291                close = EXCLUDED.close,
292                high = EXCLUDED.high,
293                low = EXCLUDED.low,
294                direct_volume = EXCLUDED.direct_volume,
295                swap_volume = EXCLUDED.swap_volume
296            ",
297            )
298            .bind(self.asset_start.to_bytes())
299            .bind(self.asset_end.to_bytes())
300            .bind(self.window.to_string())
301            .bind(start)
302            .bind(candle.open)
303            .bind(candle.close)
304            .bind(candle.high)
305            .bind(candle.low)
306            .bind(candle.direct_volume)
307            .bind(candle.swap_volume)
308            .execute(dbtx.as_mut())
309            .await?;
310            Ok(())
311        }
312
313        pub async fn update(
314            &mut self,
315            dbtx: &mut PgTransaction<'_>,
316            now: DateTime,
317            candle: Candle,
318        ) -> anyhow::Result<()> {
319            let state = match self.state.as_mut() {
320                Some(x) => x,
321                None => {
322                    self.state = Some(WindowedCandle::new(now, self.window, candle));
323                    self.state.as_mut().unwrap()
324                }
325            };
326            if let Some((start, old_candle)) = state.with_candle(now, candle) {
327                self.write_candle(dbtx, start, old_candle).await?;
328            };
329            Ok(())
330        }
331
332        pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
333            let state = std::mem::replace(&mut self.state, None);
334            if let Some(state) = state {
335                let (start, candle) = state.flush();
336                self.write_candle(dbtx, start, candle).await?;
337            }
338            Ok(())
339        }
340    }
341}
342
343use price_chart::Context as PriceChartContext;
344
345mod summary {
346    use cometindex::PgTransaction;
347    use penumbra_sdk_asset::asset;
348
349    use super::{Candle, DateTime, PairMetrics, Window};
350
351    pub struct Context {
352        start: asset::Id,
353        end: asset::Id,
354        price: f64,
355        liquidity: f64,
356        start_price_indexing_denom: f64,
357    }
358
359    impl Context {
360        pub async fn load(
361            dbtx: &mut PgTransaction<'_>,
362            start: asset::Id,
363            end: asset::Id,
364        ) -> anyhow::Result<Self> {
365            let row: Option<(f64, f64, f64)> = sqlx::query_as(
366                "
367                SELECT price, liquidity, start_price_indexing_denom
368                FROM dex_ex_pairs_block_snapshot
369                WHERE asset_start = $1
370                AND asset_end = $2
371                ORDER BY id DESC
372                LIMIT 1
373            ",
374            )
375            .bind(start.to_bytes())
376            .bind(end.to_bytes())
377            .fetch_optional(dbtx.as_mut())
378            .await?;
379            let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
380            Ok(Self {
381                start,
382                end,
383                price,
384                liquidity,
385                start_price_indexing_denom,
386            })
387        }
388
389        pub async fn update(
390            &mut self,
391            dbtx: &mut PgTransaction<'_>,
392            now: DateTime,
393            candle: Option<Candle>,
394            metrics: PairMetrics,
395            start_price_indexing_denom: Option<f64>,
396        ) -> anyhow::Result<()> {
397            if let Some(candle) = candle {
398                self.price = candle.close;
399            }
400            if let Some(price) = start_price_indexing_denom {
401                self.start_price_indexing_denom = price;
402            }
403            self.liquidity += metrics.liquidity_change;
404
405            sqlx::query(
406                "
407            INSERT INTO dex_ex_pairs_block_snapshot VALUES (
408                DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
409            )        
410        ",
411            )
412            .bind(now)
413            .bind(self.start.to_bytes())
414            .bind(self.end.to_bytes())
415            .bind(self.price)
416            .bind(self.liquidity)
417            .bind(candle.map(|x| x.direct_volume).unwrap_or_default())
418            .bind(candle.map(|x| x.swap_volume).unwrap_or_default())
419            .bind(self.start_price_indexing_denom)
420            .bind(metrics.trades)
421            .execute(dbtx.as_mut())
422            .await?;
423            Ok(())
424        }
425    }
426
427    pub async fn update_summaries(
428        dbtx: &mut PgTransaction<'_>,
429        now: DateTime,
430        window: Window,
431    ) -> anyhow::Result<()> {
432        let then = window.subtract_from(now);
433        let pairs = sqlx::query_as::<_, (Vec<u8>, Vec<u8>)>(
434            "SELECT asset_start, asset_end FROM dex_ex_pairs_block_snapshot GROUP BY asset_start, asset_end",
435        )
436        .fetch_all(dbtx.as_mut())
437        .await?;
438        for (start, end) in pairs {
439            sqlx::query(
440                "
441            WITH
442            snapshots AS (
443                SELECT *
444                FROM dex_ex_pairs_block_snapshot
445                WHERE asset_start = $1
446                AND asset_end = $2
447            ),
448            previous AS (
449                SELECT price AS price_then, liquidity AS liquidity_then
450                FROM snapshots
451                WHERE time <= $4
452                ORDER BY time DESC
453                LIMIT 1
454            ),
455            previous_or_default AS (
456                SELECT
457                    COALESCE((SELECT price_then FROM previous), 0.0) AS price_then,
458                    COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then
459            ),
460            now AS (
461                SELECT price, liquidity            
462                FROM snapshots
463                WHERE time <= $3
464                ORDER BY time DESC
465                LIMIT 1
466            ),
467            sums AS (
468                SELECT
469                    COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
470                    COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
471                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
472                    COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
473                    COALESCE(SUM(trades), 0.0) AS trades_over_window,
474                    COALESCE(MIN(price), 0.0) AS low,
475                    COALESCE(MAX(price), 0.0) AS high
476                FROM snapshots
477                WHERE time <= $3
478                AND time >= $4
479            )
480            INSERT INTO dex_ex_pairs_summary
481            SELECT 
482                $1, $2, $5,
483                price, price_then,
484                low, high,
485                liquidity, liquidity_then,
486                direct_volume_over_window,
487                swap_volume_over_window,
488                direct_volume_indexing_denom_over_window,
489                swap_volume_indexing_denom_over_window,
490                trades_over_window
491            FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
492            ON CONFLICT (asset_start, asset_end, the_window)
493            DO UPDATE SET
494                price = EXCLUDED.price,
495                price_then = EXCLUDED.price_then,
496                liquidity = EXCLUDED.liquidity,
497                liquidity_then = EXCLUDED.liquidity_then,
498                direct_volume_over_window = EXCLUDED.direct_volume_over_window,
499                swap_volume_over_window = EXCLUDED.swap_volume_over_window,
500                direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
501                swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
502                trades_over_window = EXCLUDED.trades_over_window
503            ",
504            )
505            .bind(start)
506            .bind(end)
507            .bind(now)
508            .bind(then)
509            .bind(window.to_string())
510            .execute(dbtx.as_mut())
511            .await?;
512        }
513        Ok(())
514    }
515
516    pub async fn update_aggregate_summary(
517        dbtx: &mut PgTransaction<'_>,
518        window: Window,
519        denom: asset::Id,
520        min_liquidity: f64,
521    ) -> anyhow::Result<()> {
522        // TODO: do something here
523        sqlx::query(
524            "
525        WITH       
526            eligible_denoms AS (
527                SELECT asset_start as asset, price
528                FROM dex_ex_pairs_summary
529                WHERE asset_end = $1
530                AND liquidity >= $2
531                UNION VALUES ($1, 1.0)
532            ),
533            converted_pairs_summary AS (
534                SELECT
535                    asset_start, asset_end,
536                    (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
537                    liquidity * ed_end.price AS liquidity,
538                    direct_volume_indexing_denom_over_window AS dv,
539                    swap_volume_indexing_denom_over_window AS sv,
540                    trades_over_window as trades
541                FROM dex_ex_pairs_summary
542                JOIN eligible_denoms AS ed_end
543                ON ed_end.asset = asset_end
544                JOIN eligible_denoms AS ed_start
545                ON ed_start.asset = asset_start
546                WHERE the_window = $3
547            ),
548            sums AS (
549                SELECT
550                    SUM(dv) AS direct_volume,
551                    SUM(sv) AS swap_volume,
552                    SUM(trades) AS trades
553                FROM converted_pairs_summary WHERE asset_start < asset_end
554            ),
555            total_liquidity AS (
556                SELECT
557                    SUM(liquidity) AS liquidity
558                FROM converted_pairs_summary
559            ),
560            undirected_pairs_summary AS (
561              WITH pairs AS (
562                  SELECT asset_start AS a_start, asset_end AS a_end FROM converted_pairs_summary WHERE asset_start < asset_end
563              )
564              SELECT
565                  a_start AS asset_start,
566                  a_end AS asset_end,
567                  (SELECT SUM(sv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS sv,
568                  (SELECT SUM(dv) FROM converted_pairs_summary WHERE (asset_start = a_start AND asset_end = a_end) OR (asset_start = a_end AND asset_end = a_start)) AS dv
569              FROM pairs
570            ),
571            counts AS (
572              SELECT COUNT(*) AS active_pairs FROM undirected_pairs_summary WHERE dv > 0
573            ),
574            largest_sv AS (
575                SELECT
576                    asset_start AS largest_sv_trading_pair_start,
577                    asset_end AS largest_sv_trading_pair_end,
578                    sv AS largest_sv_trading_pair_volume                    
579                FROM undirected_pairs_summary
580                ORDER BY sv DESC
581                LIMIT 1
582            ),
583            largest_dv AS (
584                SELECT
585                    asset_start AS largest_dv_trading_pair_start,
586                    asset_end AS largest_dv_trading_pair_end,
587                    dv AS largest_dv_trading_pair_volume                    
588                FROM undirected_pairs_summary
589                ORDER BY dv DESC
590                LIMIT 1
591            ),
592            top_price_mover AS (
593                SELECT
594                    asset_start AS top_price_mover_start,
595                    asset_end AS top_price_mover_end,
596                    price_change AS top_price_mover_change_percent
597                FROM converted_pairs_summary
598                ORDER BY price_change DESC
599                LIMIT 1
600            )
601        INSERT INTO dex_ex_aggregate_summary
602        SELECT
603            $3,
604            direct_volume, swap_volume, liquidity, trades, active_pairs,
605            largest_sv_trading_pair_start,
606            largest_sv_trading_pair_end,
607            largest_sv_trading_pair_volume,
608            largest_dv_trading_pair_start,
609            largest_dv_trading_pair_end,
610            largest_dv_trading_pair_volume,
611            top_price_mover_start,
612            top_price_mover_end,
613            top_price_mover_change_percent
614        FROM
615            sums
616        JOIN largest_sv ON TRUE
617        JOIN largest_dv ON TRUE
618        JOIN top_price_mover ON TRUE
619        JOIN total_liquidity ON TRUE
620        JOIN counts ON TRUE
621        ON CONFLICT (the_window) DO UPDATE SET
622            direct_volume = EXCLUDED.direct_volume,
623            swap_volume = EXCLUDED.swap_volume,
624            liquidity = EXCLUDED.liquidity,
625            trades = EXCLUDED.trades,
626            active_pairs = EXCLUDED.active_pairs,          
627            largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start,
628            largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end,
629            largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume,
630            largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start,
631            largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end,
632            largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume,
633            top_price_mover_start = EXCLUDED.top_price_mover_start,
634            top_price_mover_end = EXCLUDED.top_price_mover_end,
635            top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent
636        ")
637        .bind(denom.to_bytes())
638        .bind(min_liquidity)
639        .bind(window.to_string())
640        .execute(dbtx.as_mut())
641        .await?;
642        Ok(())
643    }
644}
645
646mod metadata {
647    use super::*;
648
649    pub async fn set(dbtx: &mut PgTransaction<'_>, quote_asset: asset::Id) -> anyhow::Result<()> {
650        sqlx::query(
651            "
652        INSERT INTO dex_ex_metadata
653        VALUES (1, $1)
654        ON CONFLICT (id) DO UPDATE 
655        SET id = EXCLUDED.id,
656            quote_asset_id = EXCLUDED.quote_asset_id
657        ",
658        )
659        .bind(quote_asset.to_bytes())
660        .execute(dbtx.as_mut())
661        .await?;
662        Ok(())
663    }
664}
665
666#[derive(Debug, Default, Clone, Copy)]
667struct PairMetrics {
668    trades: f64,
669    liquidity_change: f64,
670}
671
672#[derive(Debug, Clone, serde::Serialize)]
673struct BatchSwapSummary {
674    asset_start: asset::Id,
675    asset_end: asset::Id,
676    input: Amount,
677    output: Amount,
678    num_swaps: i32,
679    price_float: f64,
680}
681
682#[derive(Debug)]
683struct Events {
684    time: Option<DateTime>,
685    height: i32,
686    candles: HashMap<DirectedTradingPair, Candle>,
687    metrics: HashMap<DirectedTradingPair, PairMetrics>,
688    // Relevant positions.
689    positions: BTreeMap<PositionId, Position>,
690    // Store events
691    position_opens: Vec<EventPositionOpen>,
692    position_executions: Vec<EventPositionExecution>,
693    position_closes: Vec<EventPositionClose>,
694    position_withdrawals: Vec<EventPositionWithdraw>,
695    batch_swaps: Vec<EventBatchSwap>,
696    swaps: BTreeMap<TradingPair, Vec<EventSwap>>,
697    swap_claims: BTreeMap<TradingPair, Vec<EventSwapClaim>>,
698    // Track transaction hashes by position ID
699    position_open_txs: BTreeMap<PositionId, [u8; 32]>,
700    position_close_txs: BTreeMap<PositionId, [u8; 32]>,
701    position_withdrawal_txs: BTreeMap<PositionId, [u8; 32]>,
702}
703
704impl Events {
705    fn new() -> Self {
706        Self {
707            time: None,
708            height: 0,
709            candles: HashMap::new(),
710            metrics: HashMap::new(),
711            positions: BTreeMap::new(),
712            position_opens: Vec::new(),
713            position_executions: Vec::new(),
714            position_closes: Vec::new(),
715            position_withdrawals: Vec::new(),
716            batch_swaps: Vec::new(),
717            swaps: BTreeMap::new(),
718            swap_claims: BTreeMap::new(),
719            position_open_txs: BTreeMap::new(),
720            position_close_txs: BTreeMap::new(),
721            position_withdrawal_txs: BTreeMap::new(),
722        }
723    }
724
725    fn with_time(&mut self, time: DateTime) {
726        self.time = Some(time)
727    }
728
729    fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics {
730        if !self.metrics.contains_key(pair) {
731            self.metrics.insert(*pair, PairMetrics::default());
732        }
733        // NOPANIC: inserted above.
734        self.metrics.get_mut(pair).unwrap()
735    }
736
737    fn with_trace(&mut self, input: Value, output: Value) {
738        let pair_1_2 = DirectedTradingPair {
739            start: input.asset_id,
740            end: output.asset_id,
741        };
742        // This shouldn't happen, but to avoid weird stuff later, let's ignore this trace.
743        if pair_1_2.start == pair_1_2.end {
744            tracing::warn!(?input, ?output, "found trace with a loop?");
745            return;
746        }
747        let pair_2_1 = pair_1_2.flip();
748        let input_amount = f64::from(input.amount);
749        let output_amount = f64::from(output.amount);
750        let price_1_2 = output_amount / input_amount;
751        let candle_1_2 = Candle::point(price_1_2, input_amount);
752        let candle_2_1 = Candle::point(1.0 / price_1_2, output_amount);
753        self.metric(&pair_1_2).trades += 1.0;
754        self.candles
755            .entry(pair_1_2)
756            .and_modify(|c| c.merge(&candle_1_2))
757            .or_insert(candle_1_2);
758        self.metric(&pair_2_1).trades += 1.0;
759        self.candles
760            .entry(pair_2_1)
761            .and_modify(|c| c.merge(&candle_2_1))
762            .or_insert(candle_2_1);
763    }
764
765    fn with_swap_execution(&mut self, se: &SwapExecution) {
766        for row in se.traces.iter() {
767            for window in row.windows(2) {
768                self.with_trace(window[0], window[1]);
769            }
770        }
771        let pair_1_2 = DirectedTradingPair {
772            start: se.input.asset_id,
773            end: se.output.asset_id,
774        };
775        // When doing arb, we don't want to report the volume on UM -> UM,
776        // so we need this check.
777        if pair_1_2.start == pair_1_2.end {
778            return;
779        }
780        let pair_2_1 = pair_1_2.flip();
781        self.candles
782            .entry(pair_1_2)
783            .and_modify(|c| c.swap_volume += f64::from(se.input.amount));
784        self.candles
785            .entry(pair_2_1)
786            .and_modify(|c| c.swap_volume += f64::from(se.output.amount));
787    }
788
789    fn with_reserve_change(
790        &mut self,
791        pair: &TradingPair,
792        old_reserves: Option<Reserves>,
793        new_reserves: Reserves,
794        removed: bool,
795    ) {
796        let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) {
797            (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)),
798            (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)),
799            (_, Some(old), new) => (
800                (new.r1.value() as f64) - (old.r1.value() as f64),
801                (new.r2.value() as f64) - (old.r2.value() as f64),
802            ),
803        };
804        for (d_pair, diff) in [
805            (
806                DirectedTradingPair {
807                    start: pair.asset_1(),
808                    end: pair.asset_2(),
809                },
810                diff_2,
811            ),
812            (
813                DirectedTradingPair {
814                    start: pair.asset_2(),
815                    end: pair.asset_1(),
816                },
817                diff_1,
818            ),
819        ] {
820            self.metric(&d_pair).liquidity_change += diff;
821        }
822    }
823
824    pub fn extract(block: &BlockEvents, ignore_arb_executions: bool) -> anyhow::Result<Self> {
825        let mut out = Self::new();
826        out.height = block.height() as i32;
827
828        for event in block.events() {
829            if let Ok(e) = EventBlockRoot::try_from_event(&event.event) {
830                let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!(
831                    "creating timestamp should succeed; timestamp: {}",
832                    e.timestamp_seconds
833                ))?;
834                out.with_time(time);
835            } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) {
836                out.with_reserve_change(
837                    &e.trading_pair,
838                    None,
839                    Reserves {
840                        r1: e.reserves_1,
841                        r2: e.reserves_2,
842                    },
843                    false,
844                );
845                if let Some(tx_hash) = event.tx_hash() {
846                    out.position_open_txs.insert(e.position_id, tx_hash);
847                }
848                // A newly opened position might be executed against in this block,
849                // but wouldn't already be in the database. Adding it here ensures
850                // it's available.
851                out.positions.insert(e.position_id, e.position.clone());
852                out.position_opens.push(e);
853            } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) {
854                // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few
855                // positions to close with being withdrawn.
856                out.with_reserve_change(
857                    &e.trading_pair,
858                    None,
859                    Reserves {
860                        r1: e.reserves_1,
861                        r2: e.reserves_2,
862                    },
863                    true,
864                );
865                if let Some(tx_hash) = event.tx_hash() {
866                    out.position_withdrawal_txs.insert(e.position_id, tx_hash);
867                }
868                out.position_withdrawals.push(e);
869            } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) {
870                out.with_reserve_change(
871                    &e.trading_pair,
872                    Some(Reserves {
873                        r1: e.prev_reserves_1,
874                        r2: e.prev_reserves_2,
875                    }),
876                    Reserves {
877                        r1: e.reserves_1,
878                        r2: e.reserves_2,
879                    },
880                    false,
881                );
882                out.position_executions.push(e);
883            } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) {
884                out.position_closes.push(e);
885            } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
886                // The position close event is emitted by the dex module at EOB,
887                // so we need to track it with the tx hash of the closure tx.
888                if let Some(tx_hash) = event.tx_hash() {
889                    out.position_close_txs.insert(e.position_id, tx_hash);
890                }
891            } else if let Ok(e) = EventSwap::try_from_event(&event.event) {
892                out.swaps
893                    .entry(e.trading_pair)
894                    .or_insert_with(Vec::new)
895                    .push(e);
896            } else if let Ok(e) = EventSwapClaim::try_from_event(&event.event) {
897                out.swap_claims
898                    .entry(e.trading_pair)
899                    .or_insert_with(Vec::new)
900                    .push(e);
901            } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
902                let pair = DirectedTradingPair {
903                    start: e.incentivized_asset_id,
904                    end: *STAKING_TOKEN_ASSET_ID,
905                };
906                out.metric(&pair).liquidity_change += e.reward_amount.value() as f64;
907            } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
908                // NOTE: order matters here, 2 for 1 happened after.
909                if let Some(se) = e.swap_execution_1_for_2.as_ref() {
910                    out.with_swap_execution(se);
911                }
912                if let Some(se) = e.swap_execution_2_for_1.as_ref() {
913                    out.with_swap_execution(se);
914                }
915                out.batch_swaps.push(e);
916            } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
917                if !ignore_arb_executions {
918                    out.with_swap_execution(&e.swap_execution);
919                }
920            }
921        }
922        Ok(out)
923    }
924
925    async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> {
926        // Collect position IDs that we need but don't already have
927        let missing_positions: Vec<_> = self
928            .position_executions
929            .iter()
930            .map(|e| e.position_id)
931            .filter(|id| !self.positions.contains_key(id))
932            .collect();
933
934        if missing_positions.is_empty() {
935            return Ok(());
936        }
937
938        // Load missing positions from database
939        let rows = sqlx::query(
940            "SELECT position_raw 
941             FROM dex_ex_position_state 
942             WHERE position_id = ANY($1)",
943        )
944        .bind(
945            &missing_positions
946                .iter()
947                .map(|id| id.0.as_ref())
948                .collect::<Vec<_>>(),
949        )
950        .fetch_all(dbtx.as_mut())
951        .await?;
952
953        // Decode and store each position
954        for row in rows {
955            let position_raw: Vec<u8> = row.get("position_raw");
956            let position = Position::decode(position_raw.as_slice())?;
957            self.positions.insert(position.id(), position);
958        }
959
960        Ok(())
961    }
962
963    /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
964    pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
965        self.candles
966            .get(&DirectedTradingPair::new(asset, indexing_denom))
967            .map(|x| x.close)
968    }
969}
970
971#[derive(Debug)]
972pub struct Component {
973    denom: asset::Id,
974    min_liquidity: f64,
975    ignore_arb_executions: bool,
976}
977
978impl Component {
979    pub fn new(denom: asset::Id, min_liquidity: f64, ignore_arb_executions: bool) -> Self {
980        Self {
981            denom,
982            min_liquidity,
983            ignore_arb_executions,
984        }
985    }
986
987    async fn record_position_open(
988        &self,
989        dbtx: &mut PgTransaction<'_>,
990        time: DateTime,
991        height: i32,
992        tx_hash: Option<[u8; 32]>,
993        event: &EventPositionOpen,
994    ) -> anyhow::Result<()> {
995        // Get effective prices by orienting the trading function in each direction
996        let effective_price_1_to_2: f64 = event
997            .position
998            .phi
999            .orient_start(event.trading_pair.asset_1())
1000            .expect("position trading pair matches")
1001            .effective_price()
1002            .into();
1003
1004        let effective_price_2_to_1: f64 = event
1005            .position
1006            .phi
1007            .orient_start(event.trading_pair.asset_2())
1008            .expect("position trading pair matches")
1009            .effective_price()
1010            .into();
1011
1012        // First insert initial reserves and get the rowid
1013        let opening_reserves_rowid = sqlx::query_scalar::<_, i32>(
1014            "INSERT INTO dex_ex_position_reserves (
1015                position_id,
1016                height,
1017                time,
1018                reserves_1,
1019                reserves_2
1020            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1021        )
1022        .bind(event.position_id.0)
1023        .bind(height)
1024        .bind(time)
1025        .bind(BigDecimal::from(event.reserves_1.value()))
1026        .bind(BigDecimal::from(event.reserves_2.value()))
1027        .fetch_one(dbtx.as_mut())
1028        .await?;
1029
1030        // Then insert position state with the opening_reserves_rowid
1031        sqlx::query(
1032            "INSERT INTO dex_ex_position_state (
1033                position_id,
1034                asset_1,
1035                asset_2,
1036                p,
1037                q,
1038                close_on_fill,
1039                fee_bps,
1040                effective_price_1_to_2,
1041                effective_price_2_to_1,
1042                position_raw,
1043                opening_time,
1044                opening_height,
1045                opening_tx,
1046                opening_reserves_rowid
1047            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
1048        )
1049        .bind(event.position_id.0)
1050        .bind(event.trading_pair.asset_1().to_bytes())
1051        .bind(event.trading_pair.asset_2().to_bytes())
1052        .bind(BigDecimal::from(event.position.phi.component.p.value()))
1053        .bind(BigDecimal::from(event.position.phi.component.q.value()))
1054        .bind(event.position.close_on_fill)
1055        .bind(event.trading_fee as i32)
1056        .bind(effective_price_1_to_2)
1057        .bind(effective_price_2_to_1)
1058        .bind(event.position.encode_to_vec())
1059        .bind(time)
1060        .bind(height)
1061        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1062        .bind(opening_reserves_rowid)
1063        .execute(dbtx.as_mut())
1064        .await?;
1065
1066        Ok(())
1067    }
1068
1069    async fn record_swap_execution_traces(
1070        &self,
1071        dbtx: &mut PgTransaction<'_>,
1072        time: DateTime,
1073        height: i32,
1074        swap_execution: &SwapExecution,
1075    ) -> anyhow::Result<()> {
1076        let SwapExecution {
1077            traces,
1078            input: se_input,
1079            output: se_output,
1080        } = swap_execution;
1081
1082        let asset_start = se_input.asset_id;
1083        let asset_end = se_output.asset_id;
1084        let batch_input = se_input.amount;
1085        let batch_output = se_output.amount;
1086
1087        for trace in traces.iter() {
1088            let Some(input_value) = trace.first() else {
1089                continue;
1090            };
1091            let Some(output_value) = trace.last() else {
1092                continue;
1093            };
1094
1095            let input = input_value.amount;
1096            let output = output_value.amount;
1097
1098            let price_float = (output.value() as f64) / (input.value() as f64);
1099            let amount_hops = trace
1100                .iter()
1101                .map(|x| BigDecimal::from(x.amount.value()))
1102                .collect::<Vec<_>>();
1103            let position_id_hops: Vec<[u8; 32]> = vec![];
1104            let asset_hops = trace
1105                .iter()
1106                .map(|x| x.asset_id.to_bytes())
1107                .collect::<Vec<_>>();
1108
1109            sqlx::query(
1110                "INSERT INTO dex_ex_batch_swap_traces (
1111                height,
1112                time,
1113                input,
1114                output,
1115                batch_input,
1116                batch_output,
1117                price_float,
1118                asset_start,
1119                asset_end,
1120                asset_hops,
1121                amount_hops,
1122               position_id_hops
1123            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1124            )
1125            .bind(height)
1126            .bind(time)
1127            .bind(BigDecimal::from(input.value()))
1128            .bind(BigDecimal::from(output.value()))
1129            .bind(BigDecimal::from(batch_input.value()))
1130            .bind(BigDecimal::from(batch_output.value()))
1131            .bind(price_float)
1132            .bind(asset_start.to_bytes())
1133            .bind(asset_end.to_bytes())
1134            .bind(asset_hops)
1135            .bind(amount_hops)
1136            .bind(position_id_hops)
1137            .execute(dbtx.as_mut())
1138            .await?;
1139        }
1140
1141        Ok(())
1142    }
1143
1144    async fn record_block_summary(
1145        &self,
1146        dbtx: &mut PgTransaction<'_>,
1147        time: DateTime,
1148        height: i32,
1149        events: &Events,
1150    ) -> anyhow::Result<()> {
1151        let num_opened_lps = events.position_opens.len() as i32;
1152        let num_closed_lps = events.position_closes.len() as i32;
1153        let num_withdrawn_lps = events.position_withdrawals.len() as i32;
1154        let num_swaps = events.swaps.iter().map(|(_, v)| v.len()).sum::<usize>() as i32;
1155        let num_swap_claims = events
1156            .swap_claims
1157            .iter()
1158            .map(|(_, v)| v.len())
1159            .sum::<usize>() as i32;
1160        let num_txs = events.batch_swaps.len() as i32;
1161
1162        let mut batch_swap_summaries = Vec::<BatchSwapSummary>::new();
1163
1164        for event in &events.batch_swaps {
1165            let trading_pair = event.batch_swap_output_data.trading_pair;
1166
1167            if let Some(swap_1_2) = &event.swap_execution_1_for_2 {
1168                let asset_start = swap_1_2.input.asset_id;
1169                let asset_end = swap_1_2.output.asset_id;
1170                let input = swap_1_2.input.amount;
1171                let output = swap_1_2.output.amount;
1172                let price_float = (output.value() as f64) / (input.value() as f64);
1173
1174                let empty_vec = vec![];
1175                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1176                let filtered_swaps: Vec<_> = swaps_for_pair
1177                    .iter()
1178                    .filter(|swap| swap.delta_1_i != Amount::zero())
1179                    .collect::<Vec<_>>();
1180                let num_swaps = filtered_swaps.len() as i32;
1181
1182                batch_swap_summaries.push(BatchSwapSummary {
1183                    asset_start,
1184                    asset_end,
1185                    input,
1186                    output,
1187                    num_swaps,
1188                    price_float,
1189                });
1190            }
1191
1192            if let Some(swap_2_1) = &event.swap_execution_2_for_1 {
1193                let asset_start = swap_2_1.input.asset_id;
1194                let asset_end = swap_2_1.output.asset_id;
1195                let input = swap_2_1.input.amount;
1196                let output = swap_2_1.output.amount;
1197                let price_float = (output.value() as f64) / (input.value() as f64);
1198
1199                let empty_vec = vec![];
1200                let swaps_for_pair = events.swaps.get(&trading_pair).unwrap_or(&empty_vec);
1201                let filtered_swaps: Vec<_> = swaps_for_pair
1202                    .iter()
1203                    .filter(|swap| swap.delta_2_i != Amount::zero())
1204                    .collect::<Vec<_>>();
1205                let num_swaps = filtered_swaps.len() as i32;
1206
1207                batch_swap_summaries.push(BatchSwapSummary {
1208                    asset_start,
1209                    asset_end,
1210                    input,
1211                    output,
1212                    num_swaps,
1213                    price_float,
1214                });
1215            }
1216        }
1217
1218        sqlx::query(
1219            "INSERT INTO dex_ex_block_summary (
1220            height,
1221            time,
1222            batch_swaps,
1223            num_open_lps,
1224            num_closed_lps,
1225            num_withdrawn_lps,
1226            num_swaps,
1227            num_swap_claims,
1228            num_txs
1229        ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
1230        )
1231        .bind(height)
1232        .bind(time)
1233        .bind(serde_json::to_value(&batch_swap_summaries)?)
1234        .bind(num_opened_lps)
1235        .bind(num_closed_lps)
1236        .bind(num_withdrawn_lps)
1237        .bind(num_swaps)
1238        .bind(num_swap_claims)
1239        .bind(num_txs)
1240        .execute(dbtx.as_mut())
1241        .await?;
1242
1243        Ok(())
1244    }
1245
1246    async fn record_batch_swap_traces(
1247        &self,
1248        dbtx: &mut PgTransaction<'_>,
1249        time: DateTime,
1250        height: i32,
1251        event: &EventBatchSwap,
1252    ) -> anyhow::Result<()> {
1253        let EventBatchSwap {
1254            batch_swap_output_data: _,
1255            swap_execution_1_for_2,
1256            swap_execution_2_for_1,
1257        } = event;
1258
1259        if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
1260            self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
1261                .await?;
1262        }
1263
1264        if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
1265            self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
1266                .await?;
1267        }
1268
1269        Ok(())
1270    }
1271
1272    async fn record_position_execution(
1273        &self,
1274        dbtx: &mut PgTransaction<'_>,
1275        time: DateTime,
1276        height: i32,
1277        event: &EventPositionExecution,
1278        positions: &BTreeMap<PositionId, Position>,
1279    ) -> anyhow::Result<()> {
1280        // Get the position that was executed against
1281        let position = positions
1282            .get(&event.position_id)
1283            .expect("position must exist for execution");
1284        let current = Reserves {
1285            r1: event.reserves_1,
1286            r2: event.reserves_2,
1287        };
1288        let prev = Reserves {
1289            r1: event.prev_reserves_1,
1290            r2: event.prev_reserves_2,
1291        };
1292        let flows = Flows::from_phi_and_reserves(&position.phi, &current, &prev);
1293
1294        // First insert the reserves and get the rowid
1295        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1296            "INSERT INTO dex_ex_position_reserves (
1297                position_id,
1298                height,
1299                time,
1300                reserves_1,
1301                reserves_2
1302            ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid",
1303        )
1304        .bind(event.position_id.0)
1305        .bind(height)
1306        .bind(time)
1307        .bind(BigDecimal::from(event.reserves_1.value()))
1308        .bind(BigDecimal::from(event.reserves_2.value()))
1309        .fetch_one(dbtx.as_mut())
1310        .await?;
1311
1312        // Then record the execution with the reserves_rowid
1313        sqlx::query(
1314            "INSERT INTO dex_ex_position_executions (
1315                position_id,
1316                height,
1317                time,
1318                reserves_rowid,
1319                delta_1,
1320                delta_2,
1321                lambda_1,
1322                lambda_2,
1323                fee_1,
1324                fee_2,
1325                context_asset_start,
1326                context_asset_end
1327            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
1328        )
1329        .bind(event.position_id.0)
1330        .bind(height)
1331        .bind(time)
1332        .bind(reserves_rowid)
1333        .bind(BigDecimal::from(flows.delta_1().value()))
1334        .bind(BigDecimal::from(flows.delta_2().value()))
1335        .bind(BigDecimal::from(flows.lambda_1().value()))
1336        .bind(BigDecimal::from(flows.lambda_2().value()))
1337        .bind(BigDecimal::from(flows.fee_1().value()))
1338        .bind(BigDecimal::from(flows.fee_2().value()))
1339        .bind(event.context.start.to_bytes())
1340        .bind(event.context.end.to_bytes())
1341        .execute(dbtx.as_mut())
1342        .await?;
1343
1344        Ok(())
1345    }
1346
1347    async fn record_position_close(
1348        &self,
1349        dbtx: &mut PgTransaction<'_>,
1350        time: DateTime,
1351        height: i32,
1352        tx_hash: Option<[u8; 32]>,
1353        event: &EventPositionClose,
1354    ) -> anyhow::Result<()> {
1355        sqlx::query(
1356            "UPDATE dex_ex_position_state 
1357             SET closing_time = $1,
1358                 closing_height = $2,
1359                 closing_tx = $3
1360             WHERE position_id = $4",
1361        )
1362        .bind(time)
1363        .bind(height)
1364        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1365        .bind(event.position_id.0)
1366        .execute(dbtx.as_mut())
1367        .await?;
1368
1369        Ok(())
1370    }
1371
1372    async fn record_position_withdraw(
1373        &self,
1374        dbtx: &mut PgTransaction<'_>,
1375        time: DateTime,
1376        height: i32,
1377        tx_hash: Option<[u8; 32]>,
1378        event: &EventPositionWithdraw,
1379    ) -> anyhow::Result<()> {
1380        // First insert the final reserves state (zeros after withdrawal)
1381        let reserves_rowid = sqlx::query_scalar::<_, i32>(
1382            "INSERT INTO dex_ex_position_reserves (
1383                position_id,
1384                height,
1385                time,
1386                reserves_1,
1387                reserves_2
1388            ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values
1389        )
1390        .bind(event.position_id.0)
1391        .bind(height)
1392        .bind(time)
1393        .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal
1394        .fetch_one(dbtx.as_mut())
1395        .await?;
1396
1397        sqlx::query(
1398            "INSERT INTO dex_ex_position_withdrawals (
1399                position_id,
1400                height,
1401                time,
1402                withdrawal_tx,
1403                sequence,
1404                reserves_1,
1405                reserves_2,
1406                reserves_rowid
1407            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
1408        )
1409        .bind(event.position_id.0)
1410        .bind(height)
1411        .bind(time)
1412        .bind(tx_hash.map(|h| h.as_ref().to_vec()))
1413        .bind(event.sequence as i32)
1414        .bind(BigDecimal::from(event.reserves_1.value()))
1415        .bind(BigDecimal::from(event.reserves_2.value()))
1416        .bind(reserves_rowid)
1417        .execute(dbtx.as_mut())
1418        .await?;
1419
1420        Ok(())
1421    }
1422
1423    async fn record_transaction(
1424        &self,
1425        dbtx: &mut PgTransaction<'_>,
1426        time: DateTime,
1427        height: u64,
1428        transaction_id: [u8; 32],
1429        transaction: Transaction,
1430    ) -> anyhow::Result<()> {
1431        if transaction.transaction_body.actions.is_empty() {
1432            return Ok(());
1433        }
1434        sqlx::query(
1435            "INSERT INTO dex_ex_transactions (
1436                transaction_id,
1437                transaction,
1438                height,
1439                time
1440            ) VALUES ($1, $2, $3, $4)
1441            ",
1442        )
1443        .bind(transaction_id)
1444        .bind(transaction.encode_to_vec())
1445        .bind(i32::try_from(height)?)
1446        .bind(time)
1447        .execute(dbtx.as_mut())
1448        .await?;
1449
1450        Ok(())
1451    }
1452
1453    async fn record_all_transactions(
1454        &self,
1455        dbtx: &mut PgTransaction<'_>,
1456        time: DateTime,
1457        block: &BlockEvents,
1458    ) -> anyhow::Result<()> {
1459        for (tx_id, tx_bytes) in block.transactions() {
1460            let tx = Transaction::try_from(tx_bytes)?;
1461            let height = block.height();
1462            self.record_transaction(dbtx, time, height, tx_id, tx)
1463                .await?;
1464        }
1465        Ok(())
1466    }
1467}
1468
1469#[async_trait]
1470impl AppView for Component {
1471    async fn init_chain(
1472        &self,
1473        _dbtx: &mut PgTransaction,
1474        _: &serde_json::Value,
1475    ) -> Result<(), anyhow::Error> {
1476        Ok(())
1477    }
1478
1479    fn name(&self) -> String {
1480        "dex_ex".to_string()
1481    }
1482
1483    fn version(&self) -> Version {
1484        let hash: [u8; 32] = blake2b_simd::Params::default()
1485            .personal(b"option_hash")
1486            .hash_length(32)
1487            .to_state()
1488            .update(&self.denom.to_bytes())
1489            .update(&self.min_liquidity.to_le_bytes())
1490            .update(&[u8::from(self.ignore_arb_executions)])
1491            .finalize()
1492            .as_bytes()
1493            .try_into()
1494            .expect("Impossible 000-001: expected 32 byte hash");
1495        Version::with_major(1).with_option_hash(hash)
1496    }
1497
1498    async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1499        for statement in include_str!("reset.sql").split(";") {
1500            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1501        }
1502        Ok(())
1503    }
1504
1505    async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
1506        for statement in include_str!("schema.sql").split(";") {
1507            sqlx::query(statement).execute(dbtx.as_mut()).await?;
1508        }
1509        Ok(())
1510    }
1511
1512    async fn index_batch(
1513        &self,
1514        dbtx: &mut PgTransaction,
1515        batch: EventBatch,
1516        ctx: EventBatchContext,
1517    ) -> Result<(), anyhow::Error> {
1518        metadata::set(dbtx, self.denom).await?;
1519        let mut charts = HashMap::new();
1520        let mut snapshots = HashMap::new();
1521        let mut last_time = None;
1522        for block in batch.events_by_block() {
1523            let mut events = Events::extract(&block, self.ignore_arb_executions)?;
1524            let time = events
1525                .time
1526                .expect(&format!("no block root event at height {}", block.height()));
1527            last_time = Some(time);
1528
1529            self.record_all_transactions(dbtx, time, block).await?;
1530
1531            // Load any missing positions before processing events
1532            events.load_positions(dbtx).await?;
1533
1534            // This is where we are going to build the block summary for the DEX.
1535            self.record_block_summary(dbtx, time, block.height() as i32, &events)
1536                .await?;
1537
1538            // Record batch swap execution traces.
1539            for event in &events.batch_swaps {
1540                self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
1541                    .await?;
1542            }
1543
1544            // Record position opens
1545            for event in &events.position_opens {
1546                let tx_hash = events.position_open_txs.get(&event.position_id).copied();
1547                self.record_position_open(dbtx, time, events.height, tx_hash, event)
1548                    .await?;
1549            }
1550
1551            // Process position executions
1552            for event in &events.position_executions {
1553                self.record_position_execution(dbtx, time, events.height, event, &events.positions)
1554                    .await?;
1555            }
1556
1557            // Record position closes
1558            for event in &events.position_closes {
1559                let tx_hash = events.position_close_txs.get(&event.position_id).copied();
1560                self.record_position_close(dbtx, time, events.height, tx_hash, event)
1561                    .await?;
1562            }
1563
1564            // Record position withdrawals
1565            for event in &events.position_withdrawals {
1566                let tx_hash = events
1567                    .position_withdrawal_txs
1568                    .get(&event.position_id)
1569                    .copied();
1570                self.record_position_withdraw(dbtx, time, events.height, tx_hash, event)
1571                    .await?;
1572            }
1573
1574            for (pair, candle) in &events.candles {
1575                sqlx::query(
1576                    "INSERT INTO dex_ex.candles VALUES (DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10)",
1577                )
1578                .bind(i64::try_from(block.height())?)
1579                .bind(time)
1580                .bind(pair.start.to_bytes())
1581                .bind(pair.end.to_bytes())
1582                .bind(candle.open)
1583                .bind(candle.close)
1584                .bind(candle.low)
1585                .bind(candle.high)
1586                .bind(candle.direct_volume)
1587                .bind(candle.swap_volume)
1588                .execute(dbtx.as_mut())
1589                .await?;
1590                for window in Window::all() {
1591                    let key = (pair.start, pair.end, window);
1592                    if !charts.contains_key(&key) {
1593                        let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?;
1594                        charts.insert(key, ctx);
1595                    }
1596                    charts
1597                        .get_mut(&key)
1598                        .unwrap() // safe because we just inserted above
1599                        .update(dbtx, time, *candle)
1600                        .await?;
1601                }
1602            }
1603
1604            let block_pairs = events
1605                .candles
1606                .keys()
1607                .chain(events.metrics.keys())
1608                .copied()
1609                .collect::<HashSet<_>>();
1610            for pair in block_pairs {
1611                if !snapshots.contains_key(&pair) {
1612                    let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?;
1613                    snapshots.insert(pair, ctx);
1614                }
1615                // NOPANIC: inserted above
1616                snapshots
1617                    .get_mut(&pair)
1618                    .unwrap()
1619                    .update(
1620                        dbtx,
1621                        time,
1622                        events.candles.get(&pair).copied(),
1623                        events.metrics.get(&pair).copied().unwrap_or_default(),
1624                        events.price_for(self.denom, pair.start),
1625                    )
1626                    .await?;
1627            }
1628        }
1629
1630        if ctx.is_last() {
1631            if let Some(now) = last_time {
1632                for window in Window::all() {
1633                    summary::update_summaries(dbtx, now, window).await?;
1634                    summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity)
1635                        .await?;
1636                }
1637            }
1638        }
1639        for chart in charts.into_values() {
1640            chart.unload(dbtx).await?;
1641        }
1642        Ok(())
1643    }
1644}