pindexer/stake/
delegation_txs.rs

1use anyhow::{anyhow, Result};
2use cometindex::{
3    async_trait,
4    index::{EventBatch, EventBatchContext},
5    sqlx, AppView, PgTransaction,
6};
7use penumbra_sdk_num::Amount;
8use penumbra_sdk_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
9use penumbra_sdk_stake::IdentityKey;
10
11#[derive(Debug)]
12pub struct DelegationTxs {}
13
14#[async_trait]
15impl AppView for DelegationTxs {
16    async fn init_chain(
17        &self,
18        dbtx: &mut PgTransaction,
19        _app_state: &serde_json::Value,
20    ) -> Result<()> {
21        // Create the table
22        sqlx::query(
23            "CREATE TABLE stake_delegation_txs (
24                id SERIAL PRIMARY KEY,
25                ik TEXT NOT NULL,
26                amount BIGINT NOT NULL,
27                height BIGINT NOT NULL,
28                tx_hash BYTEA NOT NULL
29            );",
30        )
31        .execute(dbtx.as_mut())
32        .await?;
33
34        // Create index on ik
35        sqlx::query("CREATE INDEX idx_stake_delegation_txs_ik ON stake_delegation_txs(ik);")
36            .execute(dbtx.as_mut())
37            .await?;
38
39        // Create descending index on height
40        sqlx::query(
41            "CREATE INDEX idx_stake_delegation_txs_height ON stake_delegation_txs(height DESC);",
42        )
43        .execute(dbtx.as_mut())
44        .await?;
45
46        // Create composite index on ik and height (descending)
47        sqlx::query("CREATE INDEX idx_stake_delegation_txs_validator_ik_height ON stake_delegation_txs(ik, height DESC);")
48            .execute(dbtx.as_mut())
49            .await?;
50
51        Ok(())
52    }
53
54    fn name(&self) -> String {
55        "stake/delegation_txs".to_string()
56    }
57
58    async fn index_batch(
59        &self,
60        dbtx: &mut PgTransaction,
61        batch: EventBatch,
62        _ctx: EventBatchContext,
63    ) -> Result<()> {
64        for event in batch.events() {
65            let pe = match pb::EventDelegate::from_event(event.as_ref()) {
66                Ok(pe) => pe,
67                Err(_) => continue,
68            };
69
70            let ik: IdentityKey = pe
71                .identity_key
72                .ok_or_else(|| anyhow::anyhow!("missing ik in event"))?
73                .try_into()?;
74
75            let amount = Amount::try_from(
76                pe.amount
77                    .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?,
78            )?;
79
80            sqlx::query(
81            "INSERT INTO stake_delegation_txs (ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)"
82        )
83        .bind(ik.to_string())
84        .bind(amount.value() as i64)
85        .bind(event.block_height as i64)
86        .bind(event.tx_hash().ok_or_else(|| anyhow!("missing tx hash in event"))?)
87        .execute(dbtx.as_mut())
88        .await?;
89        }
90
91        Ok(())
92    }
93}