pindexer/
supply.rs

1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4use cometindex::{
5    async_trait,
6    index::{EventBatch, EventBatchContext},
7    sqlx, AppView, ContextualizedEvent, PgTransaction,
8};
9use penumbra_sdk_app::genesis::Content;
10use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_num::Amount;
12use penumbra_sdk_proto::{
13    event::ProtoEvent,
14    penumbra::core::component::{
15        auction::v1 as pb_auction, dex::v1 as pb_dex, fee::v1 as pb_fee, funding::v1 as pb_funding,
16        stake::v1 as pb_stake,
17    },
18};
19use penumbra_sdk_stake::{rate::RateData, validator::Validator, IdentityKey};
20use sqlx::{Postgres, Transaction};
21use std::iter;
22
23use crate::parsing::parse_content;
24
25mod unstaked_supply {
26    //! This module handles updates around the unstaked supply.
27    use anyhow::Result;
28    use cometindex::PgTransaction;
29
30    /// Initialize the database tables for this module.
31    pub async fn init_db(dbtx: &mut PgTransaction<'_>) -> Result<()> {
32        sqlx::query(
33            r#"
34        CREATE TABLE IF NOT EXISTS supply_total_unstaked (
35            height BIGINT PRIMARY KEY,
36            um BIGINT NOT NULL,
37            auction BIGINT NOT NULL,
38            dex BIGINT NOT NULL,
39            arb BIGINT NOT NULL,
40            fees BIGINT NOT NULL
41        );
42        "#,
43        )
44        .execute(dbtx.as_mut())
45        .await?;
46        Ok(())
47    }
48
49    /// The supply of unstaked tokens, in various components.
50    #[derive(Clone, Copy, Debug, Default, PartialEq)]
51    pub struct Supply {
52        /// The supply that's not locked in any component.
53        pub um: u64,
54        /// The supply locked in the auction component.
55        pub auction: i64,
56        /// The supply locked in the dex component.
57        pub dex: i64,
58        /// The supply which has been (forever) locked away after arb.
59        pub arb: u64,
60        /// The supply which has been (forever) locked away as paid fees.
61        pub fees: u64,
62    }
63
64    /// Get the supply for at a given height.
65    async fn get_supply(dbtx: &mut PgTransaction<'_>, height: u64) -> Result<Option<Supply>> {
66        let row: Option<(i64, i64, i64, i64, i64)> = sqlx::query_as(
67            "SELECT um, auction, dex, arb, fees FROM supply_total_unstaked WHERE height <= $1 ORDER BY height DESC LIMIT 1",
68        )
69        .bind(i64::try_from(height)?)
70        .fetch_optional(dbtx.as_mut())
71        .await?;
72        match row {
73            None => Ok(None),
74            Some((um, auction, dex, arb, fees)) => Ok(Some(Supply {
75                um: um.try_into()?,
76                auction: auction.try_into()?,
77                dex: dex.try_into()?,
78                arb: arb.try_into()?,
79                fees: fees.try_into()?,
80            })),
81        }
82    }
83
84    /// Set the supply at a given height.
85    async fn set_supply(dbtx: &mut PgTransaction<'_>, height: u64, supply: Supply) -> Result<()> {
86        sqlx::query(
87            r#"
88        INSERT INTO
89            supply_total_unstaked
90        VALUES ($1, $2, $3, $4, $5, $6) 
91        ON CONFLICT (height)
92        DO UPDATE SET
93            um = excluded.um,
94            auction = excluded.auction,
95            dex = excluded.dex,
96            arb = excluded.arb,
97            fees = excluded.fees
98        "#,
99        )
100        .bind(i64::try_from(height)?)
101        .bind(i64::try_from(supply.um)?)
102        .bind(i64::try_from(supply.auction)?)
103        .bind(i64::try_from(supply.dex)?)
104        .bind(i64::try_from(supply.arb)?)
105        .bind(i64::try_from(supply.fees)?)
106        .execute(dbtx.as_mut())
107        .await?;
108        Ok(())
109    }
110
111    /// Modify the supply at a given height.
112    ///
113    /// This will take the supply at the given height, and replace it with the
114    /// new result produced by the function.
115    pub async fn modify(
116        dbtx: &mut PgTransaction<'_>,
117        height: u64,
118        f: impl FnOnce(Option<Supply>) -> Result<Supply>,
119    ) -> Result<()> {
120        let supply = get_supply(dbtx, height).await?;
121        // tracing::warn!(?supply, "supply");
122        let new_supply = f(supply)?;
123        // tracing::warn!(?new_supply, "new_supply");
124        set_supply(dbtx, height, new_supply).await
125    }
126}
127
128mod delegated_supply {
129    //! This module handles updates around the delegated supply to a validator.
130    use anyhow::{anyhow, Result};
131    use cometindex::PgTransaction;
132    use penumbra_sdk_num::fixpoint::U128x128;
133    use penumbra_sdk_stake::{rate::RateData, IdentityKey};
134
135    const BPS_SQUARED: u64 = 1_0000_0000u64;
136
137    /// Represents the supply around a given validator.
138    ///
139    /// The supply needs to track the amount of delegated tokens to that validator,
140    /// as well as the conversion rate from those tokens to the native token.
141    #[derive(Clone, Copy)]
142    pub struct Supply {
143        um: u64,
144        del_um: u64,
145        rate_bps2: u64,
146    }
147
148    impl Default for Supply {
149        fn default() -> Self {
150            Self {
151                um: 0,
152                del_um: 0,
153                rate_bps2: BPS_SQUARED,
154            }
155        }
156    }
157
158    impl Supply {
159        /// Change the amount of um in this supply, by adding or removing um.
160        pub fn add_um(self, delta: i64) -> Result<Self> {
161            let rate = U128x128::ratio(self.rate_bps2, BPS_SQUARED)?;
162            let negate = delta.is_negative();
163            let delta = delta.unsigned_abs();
164            let um_delta = delta;
165            let del_um_delta = if rate == U128x128::from(0u128) {
166                0u64
167            } else {
168                let del_um_delta = (U128x128::from(delta) / rate)?;
169                let rounded = if negate {
170                    // So that we don't remove too few del_um
171                    del_um_delta.round_up()?
172                } else {
173                    // So that we don't add too many del_um
174                    del_um_delta.round_down()
175                };
176                rounded.try_into()?
177            };
178            let out = if negate {
179                Self {
180                    um: self
181                        .um
182                        .checked_sub(um_delta)
183                        .ok_or(anyhow!("supply modification failed"))?,
184                    del_um: self
185                        .del_um
186                        .checked_sub(del_um_delta)
187                        .ok_or(anyhow!("supply modification failed"))?,
188                    rate_bps2: self.rate_bps2,
189                }
190            } else {
191                Self {
192                    um: self
193                        .um
194                        .checked_add(um_delta)
195                        .ok_or(anyhow!("supply modification failed"))?,
196                    del_um: self
197                        .del_um
198                        .checked_add(del_um_delta)
199                        .ok_or(anyhow!("supply modification failed"))?,
200                    rate_bps2: self.rate_bps2,
201                }
202            };
203            Ok(out)
204        }
205
206        /// Change the conversion rate between delegated_um and um in this supply.
207        pub fn change_rate(self, rate: &RateData) -> Result<Self> {
208            let um = rate
209                .unbonded_amount(self.del_um.into())
210                .value()
211                .try_into()?;
212
213            Ok(Self {
214                um,
215                del_um: self.del_um,
216                rate_bps2: rate.validator_exchange_rate.value().try_into()?,
217            })
218        }
219    }
220
221    /// Initialize the database tables for this module.
222    pub async fn init_db<'d>(dbtx: &mut PgTransaction<'d>) -> Result<()> {
223        sqlx::query(
224            r#"
225        CREATE TABLE IF NOT EXISTS supply_validators (
226            id SERIAL PRIMARY KEY,
227            identity_key TEXT NOT NULL
228        );
229        "#,
230        )
231        .execute(dbtx.as_mut())
232        .await?;
233        sqlx::query(
234            r#"
235        CREATE TABLE IF NOT EXISTS supply_total_staked (
236            validator_id INT REFERENCES supply_validators(id),
237            height BIGINT NOT NULL,
238            um BIGINT NOT NULL,
239            del_um BIGINT NOT NULL,
240            rate_bps2 BIGINT NOT NULL,
241            PRIMARY KEY (validator_id, height)
242        );
243        "#,
244        )
245        .execute(dbtx.as_mut())
246        .await?;
247        Ok(())
248    }
249
250    /// An opaque internal identifier for a given validator.
251    #[derive(Clone, Copy, PartialEq)]
252    pub struct ValidatorID(i32);
253
254    /// Define a validator, returning its internal ID.
255    ///
256    /// This will have no effect if the validator has already been defined.
257    pub async fn define_validator(
258        dbtx: &mut PgTransaction<'_>,
259        identity_key: &IdentityKey,
260    ) -> Result<ValidatorID> {
261        let ik_string = identity_key.to_string();
262
263        let id: Option<i32> =
264            sqlx::query_scalar(r#"SELECT id FROM supply_validators WHERE identity_key = $1"#)
265                .bind(&ik_string)
266                .fetch_optional(dbtx.as_mut())
267                .await?;
268
269        if let Some(id) = id {
270            return Ok(ValidatorID(id));
271        }
272        let id = sqlx::query_scalar(
273            r#"INSERT INTO supply_validators VALUES (DEFAULT, $1) RETURNING id"#,
274        )
275        .bind(&ik_string)
276        .fetch_one(dbtx.as_mut())
277        .await?;
278        Ok(ValidatorID(id))
279    }
280
281    /// Get the supply for a given validator at a given height.
282    async fn get_supply(
283        dbtx: &mut PgTransaction<'_>,
284        validator: ValidatorID,
285        height: u64,
286    ) -> Result<Option<Supply>> {
287        let row: Option<(i64, i64, i64)> = sqlx::query_as(
288            r#"
289            SELECT
290                um, del_um, rate_bps2
291            FROM
292                supply_total_staked
293            WHERE
294                validator_id = $1 AND height <= $2
295            ORDER BY height DESC
296            LIMIT 1
297        "#,
298        )
299        .bind(validator.0)
300        .bind(i64::try_from(height)?)
301        .fetch_optional(dbtx.as_mut())
302        .await?;
303        row.map(|(um, del_um, rate_bps2)| {
304            let um = um.try_into()?;
305            let del_um = del_um.try_into()?;
306            let rate_bps2 = rate_bps2.try_into()?;
307            Ok(Supply {
308                um,
309                del_um,
310                rate_bps2,
311            })
312        })
313        .transpose()
314    }
315
316    /// Set the supply for a given validator at a given height.
317    async fn set_supply(
318        dbtx: &mut PgTransaction<'_>,
319        validator: ValidatorID,
320        height: u64,
321        supply: Supply,
322    ) -> Result<()> {
323        sqlx::query(
324            r#"
325        INSERT INTO
326            supply_total_staked
327        VALUES ($1, $2, $3, $4, $5) 
328        ON CONFLICT (validator_id, height)
329        DO UPDATE SET
330            um = excluded.um,
331            del_um = excluded.del_um,
332            rate_bps2 = excluded.rate_bps2
333        "#,
334        )
335        .bind(validator.0)
336        .bind(i64::try_from(height)?)
337        .bind(i64::try_from(supply.um)?)
338        .bind(i64::try_from(supply.del_um)?)
339        .bind(i64::try_from(supply.rate_bps2)?)
340        .execute(dbtx.as_mut())
341        .await?;
342        Ok(())
343    }
344
345    /// Modify the supply for a given validator, at a given height.
346    pub async fn modify(
347        dbtx: &mut PgTransaction<'_>,
348        validator: ValidatorID,
349        height: u64,
350        f: impl FnOnce(Option<Supply>) -> Result<Supply>,
351    ) -> Result<()> {
352        let supply = get_supply(dbtx, validator, height).await?;
353        let new_supply = f(supply)?;
354        set_supply(dbtx, validator, height, new_supply).await
355    }
356}
357
358/// Supply-relevant events.
359/// The supply of the native staking token can change:
360/// - When notes are minted (e.g., during initial genesis, or as a result of
361/// IBC, though in the case of IBC the circuit breaker should never allow more
362/// inbound UM to be minted than outbound um were originally sent.)
363/// - As a result of claiming delegation tokens that have increased in
364/// underlying UM value due to accumulating the staking rate.
365/// - As a result of burning UM which can happen due to arbs, fees, and slashing.
366#[derive(Clone, Debug)]
367enum Event {
368    /// A parsed version of [pb::EventUndelegate]
369    Undelegate {
370        height: u64,
371        identity_key: IdentityKey,
372        unbonded_amount: Amount,
373    },
374    /// A parsed version of [pb::EventDelegate]
375    Delegate {
376        height: u64,
377        identity_key: IdentityKey,
378        amount: Amount,
379    },
380    /// A parsed version of [pb::EventFundingStreamReward]
381    FundingStreamReward { height: u64, reward_amount: Amount },
382    /// A parsed version of EventRateDataChange
383    RateDataChange {
384        height: u64,
385        identity_key: IdentityKey,
386        rate_data: RateData,
387    },
388    /// A parsed version of [auction::EventValueCircuitBreakerCredit]
389    AuctionVCBCredit {
390        height: u64,
391        asset_id: asset::Id,
392        previous_balance: Amount,
393        new_balance: Amount,
394    },
395    /// A parsed version of [auction::EventValueCircuitBreakerDebit]
396    AuctionVCBDebit {
397        height: u64,
398        asset_id: asset::Id,
399        previous_balance: Amount,
400        new_balance: Amount,
401    },
402    /// A parsed version of [dex::EventValueCircuitBreakerCredit]
403    DexVCBCredit {
404        height: u64,
405        asset_id: asset::Id,
406        previous_balance: Amount,
407        new_balance: Amount,
408    },
409    /// A parsed version of [dex::EventValueCircuitBreakerDebit]
410    DexVCBDebit {
411        height: u64,
412        asset_id: asset::Id,
413        previous_balance: Amount,
414        new_balance: Amount,
415    },
416    DexArb {
417        height: u64,
418        swap_execution: penumbra_sdk_dex::SwapExecution,
419    },
420    BlockFees {
421        height: u64,
422        total: penumbra_sdk_fee::Fee,
423    },
424}
425
426impl Event {
427    const NAMES: [&'static str; 10] = [
428        "penumbra.core.component.stake.v1.EventUndelegate",
429        "penumbra.core.component.stake.v1.EventDelegate",
430        "penumbra.core.component.funding.v1.EventFundingStreamReward",
431        "penumbra.core.component.stake.v1.EventRateDataChange",
432        "penumbra.core.component.auction.v1.EventValueCircuitBreakerCredit",
433        "penumbra.core.component.auction.v1.EventValueCircuitBreakerDebit",
434        "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
435        "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
436        "penumbra.core.component.dex.v1.EventArbExecution",
437        "penumbra.core.component.fee.v1.EventBlockFees",
438    ];
439
440    async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
441        match self {
442            Event::Delegate {
443                height,
444                identity_key,
445                amount,
446            } => {
447                let amount = i64::try_from(amount.value())?;
448
449                unstaked_supply::modify(dbtx, *height, |current| {
450                    let current = current.unwrap_or_default();
451                    Ok(unstaked_supply::Supply {
452                        um: current.um - amount as u64,
453                        ..current
454                    })
455                })
456                .await?;
457
458                let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
459                delegated_supply::modify(dbtx, validator, *height, |current| {
460                    current.unwrap_or_default().add_um(amount)
461                })
462                .await
463            }
464            Event::Undelegate {
465                height,
466                identity_key,
467                unbonded_amount,
468            } => {
469                let amount = i64::try_from(unbonded_amount.value())?;
470
471                unstaked_supply::modify(dbtx, *height, |current| {
472                    let current = current.unwrap_or_default();
473                    Ok(unstaked_supply::Supply {
474                        um: current.um + amount as u64,
475                        ..current
476                    })
477                })
478                .await?;
479
480                let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
481                delegated_supply::modify(dbtx, validator, *height, |current| {
482                    current.unwrap_or_default().add_um(-amount)
483                })
484                .await
485            }
486            Event::FundingStreamReward {
487                height,
488                reward_amount,
489            } => {
490                let amount = u64::try_from(reward_amount.value())?;
491
492                unstaked_supply::modify(dbtx, *height, |current| {
493                    let current = current.unwrap_or_default();
494                    Ok(unstaked_supply::Supply {
495                        um: current.um + amount as u64,
496                        ..current
497                    })
498                })
499                .await
500            }
501            Event::RateDataChange {
502                height,
503                identity_key,
504                rate_data,
505            } => {
506                let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
507                delegated_supply::modify(dbtx, validator, *height, |current| {
508                    current.unwrap_or_default().change_rate(rate_data)
509                })
510                .await
511            }
512            Event::AuctionVCBCredit {
513                height,
514                asset_id,
515                previous_balance,
516                new_balance,
517            } => {
518                if *asset_id != *STAKING_TOKEN_ASSET_ID {
519                    return Ok(());
520                }
521
522                let added = i64::try_from(new_balance.value() - previous_balance.value())?;
523                unstaked_supply::modify(dbtx, *height, |current| {
524                    let current = current.unwrap_or_default();
525                    Ok(unstaked_supply::Supply {
526                        um: current.um - added as u64,
527                        auction: current.auction.checked_add(added).ok_or(anyhow!(format!(
528                            "AuctionVCB overflow: {} + {}",
529                            current.auction, added
530                        )))?,
531                        ..current
532                    })
533                })
534                .await
535            }
536            Event::AuctionVCBDebit {
537                height,
538                asset_id,
539                previous_balance,
540                new_balance,
541            } => {
542                if *asset_id != *STAKING_TOKEN_ASSET_ID {
543                    return Ok(());
544                }
545
546                let removed = i64::try_from(previous_balance.value() - new_balance.value())?;
547                unstaked_supply::modify(dbtx, *height, |current| {
548                    let current = current.unwrap_or_default();
549                    Ok(unstaked_supply::Supply {
550                        um: current.um + removed as u64,
551                        auction: current.auction.checked_sub(removed).ok_or(anyhow!(format!(
552                            "AuctionVCB underflow: {} - {}",
553                            current.auction, removed
554                        )))?,
555                        ..current
556                    })
557                })
558                .await
559            }
560            Event::DexVCBCredit {
561                height,
562                asset_id,
563                previous_balance,
564                new_balance,
565            } => {
566                if *asset_id != *STAKING_TOKEN_ASSET_ID {
567                    return Ok(());
568                }
569
570                let added = i64::try_from(new_balance.value() - previous_balance.value())?;
571                unstaked_supply::modify(dbtx, *height, |current| {
572                    let current = current.unwrap_or_default();
573                    Ok(unstaked_supply::Supply {
574                        um: current.um - added as u64,
575                        dex: current.dex.checked_sub(added).ok_or(anyhow!(format!(
576                            "DexVCB overflow: {} + {}",
577                            current.dex, added
578                        )))?,
579                        ..current
580                    })
581                })
582                .await
583            }
584            Event::DexVCBDebit {
585                height,
586                asset_id,
587                previous_balance,
588                new_balance,
589            } => {
590                if *asset_id != *STAKING_TOKEN_ASSET_ID {
591                    return Ok(());
592                }
593
594                let removed = i64::try_from(previous_balance.value() - new_balance.value())?;
595                unstaked_supply::modify(dbtx, *height, |current| {
596                    let current = current.unwrap_or_default();
597                    Ok(unstaked_supply::Supply {
598                        um: current.um + removed as u64,
599                        dex: current.dex.checked_add(removed).ok_or(anyhow!(format!(
600                            "DexVCB underflow: {} - {}",
601                            current.dex, removed
602                        )))?,
603                        ..current
604                    })
605                })
606                .await
607            }
608            Event::DexArb {
609                height,
610                swap_execution,
611            } => {
612                let input = swap_execution.input;
613                let output = swap_execution.output;
614                // Ignore any arb event not from the staking token to itself.
615                if input.asset_id != output.asset_id || input.asset_id != *STAKING_TOKEN_ASSET_ID {
616                    return Ok(());
617                }
618
619                let profit = u64::try_from((output.amount - input.amount).value())?;
620                unstaked_supply::modify(dbtx, *height, |current| {
621                    let current = current.unwrap_or_default();
622                    Ok(unstaked_supply::Supply {
623                        um: current.um - profit,
624                        arb: current.arb + profit,
625                        ..current
626                    })
627                })
628                .await
629            }
630            Event::BlockFees { height, total } => {
631                if total.asset_id() != *STAKING_TOKEN_ASSET_ID {
632                    return Ok(());
633                }
634                let amount = u64::try_from(total.amount().value())?;
635                // This might happen without fees frequently, potentially.
636                if amount == 0 {
637                    return Ok(());
638                }
639                // We consider the tip to be destroyed too, matching the current logic
640                // DRAGON: if this changes, this code should use the base fee only.
641                unstaked_supply::modify(dbtx, *height, |current| {
642                    let current = current.unwrap_or_default();
643                    Ok(unstaked_supply::Supply {
644                        um: current.um - amount,
645                        fees: current.fees + amount,
646                        ..current
647                    })
648                })
649                .await
650            }
651        }
652    }
653}
654
655impl TryFrom<ContextualizedEvent<'_>> for Event {
656    type Error = anyhow::Error;
657
658    fn try_from(event: ContextualizedEvent<'_>) -> Result<Self, Self::Error> {
659        match event.event.kind.as_str() {
660            // undelegation
661            x if x == Event::NAMES[0] => {
662                let pe = pb_stake::EventUndelegate::from_event(event.as_ref())?;
663                let identity_key = pe
664                    .identity_key
665                    .ok_or(anyhow!("EventUndelegate should contain identity key"))?
666                    .try_into()?;
667                let unbonded_amount = pe
668                    .amount
669                    .ok_or(anyhow!("EventUndelegate should contain amount"))?
670                    .try_into()?;
671                Ok(Self::Undelegate {
672                    height: event.block_height,
673                    identity_key,
674                    unbonded_amount,
675                })
676            }
677            // delegation
678            x if x == Event::NAMES[1] => {
679                let pe = pb_stake::EventDelegate::from_event(event.as_ref())?;
680                let identity_key = pe
681                    .identity_key
682                    .ok_or(anyhow!("EventDelegate should contain identity key"))?
683                    .try_into()?;
684                let amount = pe
685                    .amount
686                    .ok_or(anyhow!("EventDelegate should contain amount"))?
687                    .try_into()?;
688                Ok(Self::Delegate {
689                    height: event.block_height,
690                    identity_key,
691                    amount,
692                })
693            }
694            // funding stream reward
695            x if x == Event::NAMES[2] => {
696                let pe = pb_funding::EventFundingStreamReward::from_event(event.as_ref())?;
697                let reward_amount = Amount::try_from(
698                    pe.reward_amount
699                        .ok_or(anyhow!("event missing in funding stream reward"))?,
700                )?;
701                Ok(Self::FundingStreamReward {
702                    height: event.block_height,
703                    reward_amount,
704                })
705            }
706            // validator rate change
707            x if x == Event::NAMES[3] => {
708                let pe = pb_stake::EventRateDataChange::from_event(event.as_ref())?;
709                let identity_key = pe
710                    .identity_key
711                    .ok_or(anyhow!("EventRateDataChange should contain identity key"))?
712                    .try_into()?;
713                let rate_data = pe
714                    .rate_data
715                    .ok_or(anyhow!("EventRateDataChange should contain rate data"))?
716                    .try_into()?;
717                Ok(Self::RateDataChange {
718                    height: event.block_height,
719                    identity_key,
720                    rate_data,
721                })
722            }
723            // AuctionVCBCredit
724            x if x == Event::NAMES[4] => {
725                let pe = pb_auction::EventValueCircuitBreakerCredit::from_event(event.as_ref())?;
726                let asset_id = pe
727                    .asset_id
728                    .ok_or(anyhow!("AuctionVCBCredit missing asset_id"))?
729                    .try_into()?;
730                let previous_balance = pe
731                    .previous_balance
732                    .ok_or(anyhow!("AuctionVCBCredit missing previous_balance"))?
733                    .try_into()?;
734                let new_balance = pe
735                    .new_balance
736                    .ok_or(anyhow!("AuctionVCBCredit missing previous_balance"))?
737                    .try_into()?;
738                Ok(Self::AuctionVCBCredit {
739                    height: event.block_height,
740                    asset_id,
741                    previous_balance,
742                    new_balance,
743                })
744            }
745            // AuctionVCBDebit
746            x if x == Event::NAMES[5] => {
747                let pe = pb_auction::EventValueCircuitBreakerDebit::from_event(event.as_ref())?;
748                let asset_id = pe
749                    .asset_id
750                    .ok_or(anyhow!("AuctionVCBDebit missing asset_id"))?
751                    .try_into()?;
752                let previous_balance = pe
753                    .previous_balance
754                    .ok_or(anyhow!("AuctionVCBDebit missing previous_balance"))?
755                    .try_into()?;
756                let new_balance = pe
757                    .new_balance
758                    .ok_or(anyhow!("AuctionVCBDebit missing previous_balance"))?
759                    .try_into()?;
760                Ok(Self::AuctionVCBDebit {
761                    height: event.block_height,
762                    asset_id,
763                    previous_balance,
764                    new_balance,
765                })
766            }
767            // DexVCBCredit
768            x if x == Event::NAMES[6] => {
769                let pe = pb_dex::EventValueCircuitBreakerCredit::from_event(event.as_ref())?;
770                let asset_id = pe
771                    .asset_id
772                    .ok_or(anyhow!("DexVCBCredit missing asset_id"))?
773                    .try_into()?;
774                let previous_balance = pe
775                    .previous_balance
776                    .ok_or(anyhow!("DexVCBCredit missing previous_balance"))?
777                    .try_into()?;
778                let new_balance = pe
779                    .new_balance
780                    .ok_or(anyhow!("DexVCBCredit missing previous_balance"))?
781                    .try_into()?;
782                Ok(Self::DexVCBCredit {
783                    height: event.block_height,
784                    asset_id,
785                    previous_balance,
786                    new_balance,
787                })
788            }
789            // DexVCBDebit
790            x if x == Event::NAMES[7] => {
791                let pe = pb_dex::EventValueCircuitBreakerDebit::from_event(event.as_ref())?;
792                let asset_id = pe
793                    .asset_id
794                    .ok_or(anyhow!("DexVCBDebit missing asset_id"))?
795                    .try_into()?;
796                let previous_balance = pe
797                    .previous_balance
798                    .ok_or(anyhow!("DexVCBDebit missing previous_balance"))?
799                    .try_into()?;
800                let new_balance = pe
801                    .new_balance
802                    .ok_or(anyhow!("DexVCBDebit missing previous_balance"))?
803                    .try_into()?;
804                Ok(Self::DexVCBDebit {
805                    height: event.block_height,
806                    asset_id,
807                    previous_balance,
808                    new_balance,
809                })
810            }
811            // DexArb
812            x if x == Event::NAMES[8] => {
813                let pe = pb_dex::EventArbExecution::from_event(event.as_ref())?;
814                let swap_execution = pe
815                    .swap_execution
816                    .ok_or(anyhow!("EventArbExecution missing swap_execution"))?
817                    .try_into()?;
818                Ok(Self::DexArb {
819                    height: event.block_height,
820                    swap_execution,
821                })
822            }
823            // BlockFees
824            x if x == Event::NAMES[9] => {
825                let pe = pb_fee::EventBlockFees::from_event(event.as_ref())?;
826                let total = pe
827                    .swapped_fee_total
828                    .ok_or(anyhow!("EventBlockFees missing swapped_fee_total"))?
829                    .try_into()?;
830                Ok(Self::BlockFees {
831                    height: event.block_height,
832                    total,
833                })
834            }
835            x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
836        }
837    }
838}
839
840/// Add the initial native token supply.
841async fn add_genesis_native_token_allocation_supply<'a>(
842    dbtx: &mut PgTransaction<'a>,
843    content: &Content,
844) -> Result<()> {
845    fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
846        let community_pool_mint = iter::once((
847            *STAKING_TOKEN_ASSET_ID,
848            content.community_pool_content.initial_balance.amount,
849        ));
850        let allocation_mints = content
851            .shielded_pool_content
852            .allocations
853            .iter()
854            .map(|allocation| {
855                let value = allocation.value();
856                (value.asset_id, value.amount)
857            });
858
859        let mut out = BTreeMap::new();
860        for (id, amount) in community_pool_mint.chain(allocation_mints) {
861            out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
862        }
863        out
864    }
865
866    let mints = content_mints(content);
867
868    let unstaked_mint = u64::try_from(
869        mints
870            .get(&*STAKING_TOKEN_ASSET_ID)
871            .copied()
872            .unwrap_or_default()
873            .value(),
874    )?;
875    unstaked_supply::modify(dbtx, 0, |_| {
876        Ok(unstaked_supply::Supply {
877            um: unstaked_mint,
878            auction: 0,
879            dex: 0,
880            arb: 0,
881            fees: 0,
882        })
883    })
884    .await?;
885
886    // at genesis, assume a 1:1 ratio between delegation amount and native token amount.
887    for val in &content.stake_content.validators {
888        let val = Validator::try_from(val.clone())?;
889        let delegation_amount: i64 = mints
890            .get(&val.token().id())
891            .cloned()
892            .unwrap_or_default()
893            .value()
894            .try_into()?;
895
896        let val_id = delegated_supply::define_validator(dbtx, &val.identity_key).await?;
897        delegated_supply::modify(dbtx, val_id, 0, |_| {
898            delegated_supply::Supply::default().add_um(delegation_amount)
899        })
900        .await?;
901    }
902
903    Ok(())
904}
905
906#[derive(Debug)]
907pub struct Component {}
908
909impl Component {
910    pub fn new() -> Self {
911        Self {}
912    }
913}
914
915#[async_trait]
916impl AppView for Component {
917    fn name(&self) -> String {
918        "supply".to_string()
919    }
920
921    async fn init_chain(
922        &self,
923        dbtx: &mut PgTransaction,
924        app_state: &serde_json::Value,
925    ) -> Result<(), anyhow::Error> {
926        unstaked_supply::init_db(dbtx).await?;
927        delegated_supply::init_db(dbtx).await?;
928
929        // decode the initial supply from the genesis
930        // initial app state is not recomputed from events, because events are not emitted in init_chain.
931        // instead, the indexer directly parses the genesis.
932        add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
933            .await?;
934
935        Ok(())
936    }
937
938    async fn index_batch(
939        &self,
940        dbtx: &mut PgTransaction,
941        batch: EventBatch,
942        _ctx: EventBatchContext,
943    ) -> Result<(), anyhow::Error> {
944        for event in batch.events() {
945            let e = match Event::try_from(event) {
946                Ok(e) => e,
947                Err(_) => continue,
948            };
949            e.index(dbtx).await?;
950        }
951        Ok(())
952    }
953}