1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4use cometindex::{
5 async_trait,
6 index::{EventBatch, EventBatchContext},
7 sqlx, AppView, ContextualizedEvent, PgTransaction,
8};
9use penumbra_sdk_app::genesis::Content;
10use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_num::Amount;
12use penumbra_sdk_proto::{
13 event::ProtoEvent,
14 penumbra::core::component::{
15 auction::v1 as pb_auction, dex::v1 as pb_dex, fee::v1 as pb_fee, funding::v1 as pb_funding,
16 stake::v1 as pb_stake,
17 },
18};
19use penumbra_sdk_stake::{rate::RateData, validator::Validator, IdentityKey};
20use sqlx::{Postgres, Transaction};
21use std::iter;
22
23use crate::parsing::parse_content;
24
25mod unstaked_supply {
26 use anyhow::Result;
28 use cometindex::PgTransaction;
29
30 pub async fn init_db(dbtx: &mut PgTransaction<'_>) -> Result<()> {
32 sqlx::query(
33 r#"
34 CREATE TABLE IF NOT EXISTS supply_total_unstaked (
35 height BIGINT PRIMARY KEY,
36 um BIGINT NOT NULL,
37 auction BIGINT NOT NULL,
38 dex BIGINT NOT NULL,
39 arb BIGINT NOT NULL,
40 fees BIGINT NOT NULL
41 );
42 "#,
43 )
44 .execute(dbtx.as_mut())
45 .await?;
46 Ok(())
47 }
48
49 #[derive(Clone, Copy, Debug, Default, PartialEq)]
51 pub struct Supply {
52 pub um: u64,
54 pub auction: i64,
56 pub dex: i64,
58 pub arb: u64,
60 pub fees: u64,
62 }
63
64 async fn get_supply(dbtx: &mut PgTransaction<'_>, height: u64) -> Result<Option<Supply>> {
66 let row: Option<(i64, i64, i64, i64, i64)> = sqlx::query_as(
67 "SELECT um, auction, dex, arb, fees FROM supply_total_unstaked WHERE height <= $1 ORDER BY height DESC LIMIT 1",
68 )
69 .bind(i64::try_from(height)?)
70 .fetch_optional(dbtx.as_mut())
71 .await?;
72 match row {
73 None => Ok(None),
74 Some((um, auction, dex, arb, fees)) => Ok(Some(Supply {
75 um: um.try_into()?,
76 auction: auction.try_into()?,
77 dex: dex.try_into()?,
78 arb: arb.try_into()?,
79 fees: fees.try_into()?,
80 })),
81 }
82 }
83
84 async fn set_supply(dbtx: &mut PgTransaction<'_>, height: u64, supply: Supply) -> Result<()> {
86 sqlx::query(
87 r#"
88 INSERT INTO
89 supply_total_unstaked
90 VALUES ($1, $2, $3, $4, $5, $6)
91 ON CONFLICT (height)
92 DO UPDATE SET
93 um = excluded.um,
94 auction = excluded.auction,
95 dex = excluded.dex,
96 arb = excluded.arb,
97 fees = excluded.fees
98 "#,
99 )
100 .bind(i64::try_from(height)?)
101 .bind(i64::try_from(supply.um)?)
102 .bind(i64::try_from(supply.auction)?)
103 .bind(i64::try_from(supply.dex)?)
104 .bind(i64::try_from(supply.arb)?)
105 .bind(i64::try_from(supply.fees)?)
106 .execute(dbtx.as_mut())
107 .await?;
108 Ok(())
109 }
110
111 pub async fn modify(
116 dbtx: &mut PgTransaction<'_>,
117 height: u64,
118 f: impl FnOnce(Option<Supply>) -> Result<Supply>,
119 ) -> Result<()> {
120 let supply = get_supply(dbtx, height).await?;
121 let new_supply = f(supply)?;
123 set_supply(dbtx, height, new_supply).await
125 }
126}
127
128mod delegated_supply {
129 use anyhow::{anyhow, Result};
131 use cometindex::PgTransaction;
132 use penumbra_sdk_num::fixpoint::U128x128;
133 use penumbra_sdk_stake::{rate::RateData, IdentityKey};
134
135 const BPS_SQUARED: u64 = 1_0000_0000u64;
136
137 #[derive(Clone, Copy)]
142 pub struct Supply {
143 um: u64,
144 del_um: u64,
145 rate_bps2: u64,
146 }
147
148 impl Default for Supply {
149 fn default() -> Self {
150 Self {
151 um: 0,
152 del_um: 0,
153 rate_bps2: BPS_SQUARED,
154 }
155 }
156 }
157
158 impl Supply {
159 pub fn add_um(self, delta: i64) -> Result<Self> {
161 let rate = U128x128::ratio(self.rate_bps2, BPS_SQUARED)?;
162 let negate = delta.is_negative();
163 let delta = delta.unsigned_abs();
164 let um_delta = delta;
165 let del_um_delta = if rate == U128x128::from(0u128) {
166 0u64
167 } else {
168 let del_um_delta = (U128x128::from(delta) / rate)?;
169 let rounded = if negate {
170 del_um_delta.round_up()?
172 } else {
173 del_um_delta.round_down()
175 };
176 rounded.try_into()?
177 };
178 let out = if negate {
179 Self {
180 um: self
181 .um
182 .checked_sub(um_delta)
183 .ok_or(anyhow!("supply modification failed"))?,
184 del_um: self
185 .del_um
186 .checked_sub(del_um_delta)
187 .ok_or(anyhow!("supply modification failed"))?,
188 rate_bps2: self.rate_bps2,
189 }
190 } else {
191 Self {
192 um: self
193 .um
194 .checked_add(um_delta)
195 .ok_or(anyhow!("supply modification failed"))?,
196 del_um: self
197 .del_um
198 .checked_add(del_um_delta)
199 .ok_or(anyhow!("supply modification failed"))?,
200 rate_bps2: self.rate_bps2,
201 }
202 };
203 Ok(out)
204 }
205
206 pub fn change_rate(self, rate: &RateData) -> Result<Self> {
208 let um = rate
209 .unbonded_amount(self.del_um.into())
210 .value()
211 .try_into()?;
212
213 Ok(Self {
214 um,
215 del_um: self.del_um,
216 rate_bps2: rate.validator_exchange_rate.value().try_into()?,
217 })
218 }
219 }
220
221 pub async fn init_db<'d>(dbtx: &mut PgTransaction<'d>) -> Result<()> {
223 sqlx::query(
224 r#"
225 CREATE TABLE IF NOT EXISTS supply_validators (
226 id SERIAL PRIMARY KEY,
227 identity_key TEXT NOT NULL
228 );
229 "#,
230 )
231 .execute(dbtx.as_mut())
232 .await?;
233 sqlx::query(
234 r#"
235 CREATE TABLE IF NOT EXISTS supply_total_staked (
236 validator_id INT REFERENCES supply_validators(id),
237 height BIGINT NOT NULL,
238 um BIGINT NOT NULL,
239 del_um BIGINT NOT NULL,
240 rate_bps2 BIGINT NOT NULL,
241 PRIMARY KEY (validator_id, height)
242 );
243 "#,
244 )
245 .execute(dbtx.as_mut())
246 .await?;
247 Ok(())
248 }
249
250 #[derive(Clone, Copy, PartialEq)]
252 pub struct ValidatorID(i32);
253
254 pub async fn define_validator(
258 dbtx: &mut PgTransaction<'_>,
259 identity_key: &IdentityKey,
260 ) -> Result<ValidatorID> {
261 let ik_string = identity_key.to_string();
262
263 let id: Option<i32> =
264 sqlx::query_scalar(r#"SELECT id FROM supply_validators WHERE identity_key = $1"#)
265 .bind(&ik_string)
266 .fetch_optional(dbtx.as_mut())
267 .await?;
268
269 if let Some(id) = id {
270 return Ok(ValidatorID(id));
271 }
272 let id = sqlx::query_scalar(
273 r#"INSERT INTO supply_validators VALUES (DEFAULT, $1) RETURNING id"#,
274 )
275 .bind(&ik_string)
276 .fetch_one(dbtx.as_mut())
277 .await?;
278 Ok(ValidatorID(id))
279 }
280
281 async fn get_supply(
283 dbtx: &mut PgTransaction<'_>,
284 validator: ValidatorID,
285 height: u64,
286 ) -> Result<Option<Supply>> {
287 let row: Option<(i64, i64, i64)> = sqlx::query_as(
288 r#"
289 SELECT
290 um, del_um, rate_bps2
291 FROM
292 supply_total_staked
293 WHERE
294 validator_id = $1 AND height <= $2
295 ORDER BY height DESC
296 LIMIT 1
297 "#,
298 )
299 .bind(validator.0)
300 .bind(i64::try_from(height)?)
301 .fetch_optional(dbtx.as_mut())
302 .await?;
303 row.map(|(um, del_um, rate_bps2)| {
304 let um = um.try_into()?;
305 let del_um = del_um.try_into()?;
306 let rate_bps2 = rate_bps2.try_into()?;
307 Ok(Supply {
308 um,
309 del_um,
310 rate_bps2,
311 })
312 })
313 .transpose()
314 }
315
316 async fn set_supply(
318 dbtx: &mut PgTransaction<'_>,
319 validator: ValidatorID,
320 height: u64,
321 supply: Supply,
322 ) -> Result<()> {
323 sqlx::query(
324 r#"
325 INSERT INTO
326 supply_total_staked
327 VALUES ($1, $2, $3, $4, $5)
328 ON CONFLICT (validator_id, height)
329 DO UPDATE SET
330 um = excluded.um,
331 del_um = excluded.del_um,
332 rate_bps2 = excluded.rate_bps2
333 "#,
334 )
335 .bind(validator.0)
336 .bind(i64::try_from(height)?)
337 .bind(i64::try_from(supply.um)?)
338 .bind(i64::try_from(supply.del_um)?)
339 .bind(i64::try_from(supply.rate_bps2)?)
340 .execute(dbtx.as_mut())
341 .await?;
342 Ok(())
343 }
344
345 pub async fn modify(
347 dbtx: &mut PgTransaction<'_>,
348 validator: ValidatorID,
349 height: u64,
350 f: impl FnOnce(Option<Supply>) -> Result<Supply>,
351 ) -> Result<()> {
352 let supply = get_supply(dbtx, validator, height).await?;
353 let new_supply = f(supply)?;
354 set_supply(dbtx, validator, height, new_supply).await
355 }
356}
357
358#[derive(Clone, Debug)]
367enum Event {
368 Undelegate {
370 height: u64,
371 identity_key: IdentityKey,
372 unbonded_amount: Amount,
373 },
374 Delegate {
376 height: u64,
377 identity_key: IdentityKey,
378 amount: Amount,
379 },
380 FundingStreamReward { height: u64, reward_amount: Amount },
382 RateDataChange {
384 height: u64,
385 identity_key: IdentityKey,
386 rate_data: RateData,
387 },
388 AuctionVCBCredit {
390 height: u64,
391 asset_id: asset::Id,
392 previous_balance: Amount,
393 new_balance: Amount,
394 },
395 AuctionVCBDebit {
397 height: u64,
398 asset_id: asset::Id,
399 previous_balance: Amount,
400 new_balance: Amount,
401 },
402 DexVCBCredit {
404 height: u64,
405 asset_id: asset::Id,
406 previous_balance: Amount,
407 new_balance: Amount,
408 },
409 DexVCBDebit {
411 height: u64,
412 asset_id: asset::Id,
413 previous_balance: Amount,
414 new_balance: Amount,
415 },
416 DexArb {
417 height: u64,
418 swap_execution: penumbra_sdk_dex::SwapExecution,
419 },
420 BlockFees {
421 height: u64,
422 total: penumbra_sdk_fee::Fee,
423 },
424}
425
426impl Event {
427 const NAMES: [&'static str; 10] = [
428 "penumbra.core.component.stake.v1.EventUndelegate",
429 "penumbra.core.component.stake.v1.EventDelegate",
430 "penumbra.core.component.funding.v1.EventFundingStreamReward",
431 "penumbra.core.component.stake.v1.EventRateDataChange",
432 "penumbra.core.component.auction.v1.EventValueCircuitBreakerCredit",
433 "penumbra.core.component.auction.v1.EventValueCircuitBreakerDebit",
434 "penumbra.core.component.dex.v1.EventValueCircuitBreakerCredit",
435 "penumbra.core.component.dex.v1.EventValueCircuitBreakerDebit",
436 "penumbra.core.component.dex.v1.EventArbExecution",
437 "penumbra.core.component.fee.v1.EventBlockFees",
438 ];
439
440 async fn index<'d>(&self, dbtx: &mut Transaction<'d, Postgres>) -> anyhow::Result<()> {
441 match self {
442 Event::Delegate {
443 height,
444 identity_key,
445 amount,
446 } => {
447 let amount = i64::try_from(amount.value())?;
448
449 unstaked_supply::modify(dbtx, *height, |current| {
450 let current = current.unwrap_or_default();
451 Ok(unstaked_supply::Supply {
452 um: current.um - amount as u64,
453 ..current
454 })
455 })
456 .await?;
457
458 let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
459 delegated_supply::modify(dbtx, validator, *height, |current| {
460 current.unwrap_or_default().add_um(amount)
461 })
462 .await
463 }
464 Event::Undelegate {
465 height,
466 identity_key,
467 unbonded_amount,
468 } => {
469 let amount = i64::try_from(unbonded_amount.value())?;
470
471 unstaked_supply::modify(dbtx, *height, |current| {
472 let current = current.unwrap_or_default();
473 Ok(unstaked_supply::Supply {
474 um: current.um + amount as u64,
475 ..current
476 })
477 })
478 .await?;
479
480 let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
481 delegated_supply::modify(dbtx, validator, *height, |current| {
482 current.unwrap_or_default().add_um(-amount)
483 })
484 .await
485 }
486 Event::FundingStreamReward {
487 height,
488 reward_amount,
489 } => {
490 let amount = u64::try_from(reward_amount.value())?;
491
492 unstaked_supply::modify(dbtx, *height, |current| {
493 let current = current.unwrap_or_default();
494 Ok(unstaked_supply::Supply {
495 um: current.um + amount as u64,
496 ..current
497 })
498 })
499 .await
500 }
501 Event::RateDataChange {
502 height,
503 identity_key,
504 rate_data,
505 } => {
506 let validator = delegated_supply::define_validator(dbtx, identity_key).await?;
507 delegated_supply::modify(dbtx, validator, *height, |current| {
508 current.unwrap_or_default().change_rate(rate_data)
509 })
510 .await
511 }
512 Event::AuctionVCBCredit {
513 height,
514 asset_id,
515 previous_balance,
516 new_balance,
517 } => {
518 if *asset_id != *STAKING_TOKEN_ASSET_ID {
519 return Ok(());
520 }
521
522 let added = i64::try_from(new_balance.value() - previous_balance.value())?;
523 unstaked_supply::modify(dbtx, *height, |current| {
524 let current = current.unwrap_or_default();
525 Ok(unstaked_supply::Supply {
526 um: current.um - added as u64,
527 auction: current.auction.checked_add(added).ok_or(anyhow!(format!(
528 "AuctionVCB overflow: {} + {}",
529 current.auction, added
530 )))?,
531 ..current
532 })
533 })
534 .await
535 }
536 Event::AuctionVCBDebit {
537 height,
538 asset_id,
539 previous_balance,
540 new_balance,
541 } => {
542 if *asset_id != *STAKING_TOKEN_ASSET_ID {
543 return Ok(());
544 }
545
546 let removed = i64::try_from(previous_balance.value() - new_balance.value())?;
547 unstaked_supply::modify(dbtx, *height, |current| {
548 let current = current.unwrap_or_default();
549 Ok(unstaked_supply::Supply {
550 um: current.um + removed as u64,
551 auction: current.auction.checked_sub(removed).ok_or(anyhow!(format!(
552 "AuctionVCB underflow: {} - {}",
553 current.auction, removed
554 )))?,
555 ..current
556 })
557 })
558 .await
559 }
560 Event::DexVCBCredit {
561 height,
562 asset_id,
563 previous_balance,
564 new_balance,
565 } => {
566 if *asset_id != *STAKING_TOKEN_ASSET_ID {
567 return Ok(());
568 }
569
570 let added = i64::try_from(new_balance.value() - previous_balance.value())?;
571 unstaked_supply::modify(dbtx, *height, |current| {
572 let current = current.unwrap_or_default();
573 Ok(unstaked_supply::Supply {
574 um: current.um - added as u64,
575 dex: current.dex.checked_sub(added).ok_or(anyhow!(format!(
576 "DexVCB overflow: {} + {}",
577 current.dex, added
578 )))?,
579 ..current
580 })
581 })
582 .await
583 }
584 Event::DexVCBDebit {
585 height,
586 asset_id,
587 previous_balance,
588 new_balance,
589 } => {
590 if *asset_id != *STAKING_TOKEN_ASSET_ID {
591 return Ok(());
592 }
593
594 let removed = i64::try_from(previous_balance.value() - new_balance.value())?;
595 unstaked_supply::modify(dbtx, *height, |current| {
596 let current = current.unwrap_or_default();
597 Ok(unstaked_supply::Supply {
598 um: current.um + removed as u64,
599 dex: current.dex.checked_add(removed).ok_or(anyhow!(format!(
600 "DexVCB underflow: {} - {}",
601 current.dex, removed
602 )))?,
603 ..current
604 })
605 })
606 .await
607 }
608 Event::DexArb {
609 height,
610 swap_execution,
611 } => {
612 let input = swap_execution.input;
613 let output = swap_execution.output;
614 if input.asset_id != output.asset_id || input.asset_id != *STAKING_TOKEN_ASSET_ID {
616 return Ok(());
617 }
618
619 let profit = u64::try_from((output.amount - input.amount).value())?;
620 unstaked_supply::modify(dbtx, *height, |current| {
621 let current = current.unwrap_or_default();
622 Ok(unstaked_supply::Supply {
623 um: current.um - profit,
624 arb: current.arb + profit,
625 ..current
626 })
627 })
628 .await
629 }
630 Event::BlockFees { height, total } => {
631 if total.asset_id() != *STAKING_TOKEN_ASSET_ID {
632 return Ok(());
633 }
634 let amount = u64::try_from(total.amount().value())?;
635 if amount == 0 {
637 return Ok(());
638 }
639 unstaked_supply::modify(dbtx, *height, |current| {
642 let current = current.unwrap_or_default();
643 Ok(unstaked_supply::Supply {
644 um: current.um - amount,
645 fees: current.fees + amount,
646 ..current
647 })
648 })
649 .await
650 }
651 }
652 }
653}
654
655impl TryFrom<ContextualizedEvent<'_>> for Event {
656 type Error = anyhow::Error;
657
658 fn try_from(event: ContextualizedEvent<'_>) -> Result<Self, Self::Error> {
659 match event.event.kind.as_str() {
660 x if x == Event::NAMES[0] => {
662 let pe = pb_stake::EventUndelegate::from_event(event.as_ref())?;
663 let identity_key = pe
664 .identity_key
665 .ok_or(anyhow!("EventUndelegate should contain identity key"))?
666 .try_into()?;
667 let unbonded_amount = pe
668 .amount
669 .ok_or(anyhow!("EventUndelegate should contain amount"))?
670 .try_into()?;
671 Ok(Self::Undelegate {
672 height: event.block_height,
673 identity_key,
674 unbonded_amount,
675 })
676 }
677 x if x == Event::NAMES[1] => {
679 let pe = pb_stake::EventDelegate::from_event(event.as_ref())?;
680 let identity_key = pe
681 .identity_key
682 .ok_or(anyhow!("EventDelegate should contain identity key"))?
683 .try_into()?;
684 let amount = pe
685 .amount
686 .ok_or(anyhow!("EventDelegate should contain amount"))?
687 .try_into()?;
688 Ok(Self::Delegate {
689 height: event.block_height,
690 identity_key,
691 amount,
692 })
693 }
694 x if x == Event::NAMES[2] => {
696 let pe = pb_funding::EventFundingStreamReward::from_event(event.as_ref())?;
697 let reward_amount = Amount::try_from(
698 pe.reward_amount
699 .ok_or(anyhow!("event missing in funding stream reward"))?,
700 )?;
701 Ok(Self::FundingStreamReward {
702 height: event.block_height,
703 reward_amount,
704 })
705 }
706 x if x == Event::NAMES[3] => {
708 let pe = pb_stake::EventRateDataChange::from_event(event.as_ref())?;
709 let identity_key = pe
710 .identity_key
711 .ok_or(anyhow!("EventRateDataChange should contain identity key"))?
712 .try_into()?;
713 let rate_data = pe
714 .rate_data
715 .ok_or(anyhow!("EventRateDataChange should contain rate data"))?
716 .try_into()?;
717 Ok(Self::RateDataChange {
718 height: event.block_height,
719 identity_key,
720 rate_data,
721 })
722 }
723 x if x == Event::NAMES[4] => {
725 let pe = pb_auction::EventValueCircuitBreakerCredit::from_event(event.as_ref())?;
726 let asset_id = pe
727 .asset_id
728 .ok_or(anyhow!("AuctionVCBCredit missing asset_id"))?
729 .try_into()?;
730 let previous_balance = pe
731 .previous_balance
732 .ok_or(anyhow!("AuctionVCBCredit missing previous_balance"))?
733 .try_into()?;
734 let new_balance = pe
735 .new_balance
736 .ok_or(anyhow!("AuctionVCBCredit missing previous_balance"))?
737 .try_into()?;
738 Ok(Self::AuctionVCBCredit {
739 height: event.block_height,
740 asset_id,
741 previous_balance,
742 new_balance,
743 })
744 }
745 x if x == Event::NAMES[5] => {
747 let pe = pb_auction::EventValueCircuitBreakerDebit::from_event(event.as_ref())?;
748 let asset_id = pe
749 .asset_id
750 .ok_or(anyhow!("AuctionVCBDebit missing asset_id"))?
751 .try_into()?;
752 let previous_balance = pe
753 .previous_balance
754 .ok_or(anyhow!("AuctionVCBDebit missing previous_balance"))?
755 .try_into()?;
756 let new_balance = pe
757 .new_balance
758 .ok_or(anyhow!("AuctionVCBDebit missing previous_balance"))?
759 .try_into()?;
760 Ok(Self::AuctionVCBDebit {
761 height: event.block_height,
762 asset_id,
763 previous_balance,
764 new_balance,
765 })
766 }
767 x if x == Event::NAMES[6] => {
769 let pe = pb_dex::EventValueCircuitBreakerCredit::from_event(event.as_ref())?;
770 let asset_id = pe
771 .asset_id
772 .ok_or(anyhow!("DexVCBCredit missing asset_id"))?
773 .try_into()?;
774 let previous_balance = pe
775 .previous_balance
776 .ok_or(anyhow!("DexVCBCredit missing previous_balance"))?
777 .try_into()?;
778 let new_balance = pe
779 .new_balance
780 .ok_or(anyhow!("DexVCBCredit missing previous_balance"))?
781 .try_into()?;
782 Ok(Self::DexVCBCredit {
783 height: event.block_height,
784 asset_id,
785 previous_balance,
786 new_balance,
787 })
788 }
789 x if x == Event::NAMES[7] => {
791 let pe = pb_dex::EventValueCircuitBreakerDebit::from_event(event.as_ref())?;
792 let asset_id = pe
793 .asset_id
794 .ok_or(anyhow!("DexVCBDebit missing asset_id"))?
795 .try_into()?;
796 let previous_balance = pe
797 .previous_balance
798 .ok_or(anyhow!("DexVCBDebit missing previous_balance"))?
799 .try_into()?;
800 let new_balance = pe
801 .new_balance
802 .ok_or(anyhow!("DexVCBDebit missing previous_balance"))?
803 .try_into()?;
804 Ok(Self::DexVCBDebit {
805 height: event.block_height,
806 asset_id,
807 previous_balance,
808 new_balance,
809 })
810 }
811 x if x == Event::NAMES[8] => {
813 let pe = pb_dex::EventArbExecution::from_event(event.as_ref())?;
814 let swap_execution = pe
815 .swap_execution
816 .ok_or(anyhow!("EventArbExecution missing swap_execution"))?
817 .try_into()?;
818 Ok(Self::DexArb {
819 height: event.block_height,
820 swap_execution,
821 })
822 }
823 x if x == Event::NAMES[9] => {
825 let pe = pb_fee::EventBlockFees::from_event(event.as_ref())?;
826 let total = pe
827 .swapped_fee_total
828 .ok_or(anyhow!("EventBlockFees missing swapped_fee_total"))?
829 .try_into()?;
830 Ok(Self::BlockFees {
831 height: event.block_height,
832 total,
833 })
834 }
835 x => Err(anyhow!(format!("unrecognized event kind: {x}"))),
836 }
837 }
838}
839
840async fn add_genesis_native_token_allocation_supply<'a>(
842 dbtx: &mut PgTransaction<'a>,
843 content: &Content,
844) -> Result<()> {
845 fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
846 let community_pool_mint = iter::once((
847 *STAKING_TOKEN_ASSET_ID,
848 content.community_pool_content.initial_balance.amount,
849 ));
850 let allocation_mints = content
851 .shielded_pool_content
852 .allocations
853 .iter()
854 .map(|allocation| {
855 let value = allocation.value();
856 (value.asset_id, value.amount)
857 });
858
859 let mut out = BTreeMap::new();
860 for (id, amount) in community_pool_mint.chain(allocation_mints) {
861 out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
862 }
863 out
864 }
865
866 let mints = content_mints(content);
867
868 let unstaked_mint = u64::try_from(
869 mints
870 .get(&*STAKING_TOKEN_ASSET_ID)
871 .copied()
872 .unwrap_or_default()
873 .value(),
874 )?;
875 unstaked_supply::modify(dbtx, 0, |_| {
876 Ok(unstaked_supply::Supply {
877 um: unstaked_mint,
878 auction: 0,
879 dex: 0,
880 arb: 0,
881 fees: 0,
882 })
883 })
884 .await?;
885
886 for val in &content.stake_content.validators {
888 let val = Validator::try_from(val.clone())?;
889 let delegation_amount: i64 = mints
890 .get(&val.token().id())
891 .cloned()
892 .unwrap_or_default()
893 .value()
894 .try_into()?;
895
896 let val_id = delegated_supply::define_validator(dbtx, &val.identity_key).await?;
897 delegated_supply::modify(dbtx, val_id, 0, |_| {
898 delegated_supply::Supply::default().add_um(delegation_amount)
899 })
900 .await?;
901 }
902
903 Ok(())
904}
905
906#[derive(Debug)]
907pub struct Component {}
908
909impl Component {
910 pub fn new() -> Self {
911 Self {}
912 }
913}
914
915#[async_trait]
916impl AppView for Component {
917 fn name(&self) -> String {
918 "supply".to_string()
919 }
920
921 async fn init_chain(
922 &self,
923 dbtx: &mut PgTransaction,
924 app_state: &serde_json::Value,
925 ) -> Result<(), anyhow::Error> {
926 unstaked_supply::init_db(dbtx).await?;
927 delegated_supply::init_db(dbtx).await?;
928
929 add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
933 .await?;
934
935 Ok(())
936 }
937
938 async fn index_batch(
939 &self,
940 dbtx: &mut PgTransaction,
941 batch: EventBatch,
942 _ctx: EventBatchContext,
943 ) -> Result<(), anyhow::Error> {
944 for event in batch.events() {
945 let e = match Event::try_from(event) {
946 Ok(e) => e,
947 Err(_) => continue,
948 };
949 e.index(dbtx).await?;
950 }
951 Ok(())
952 }
953}