pindexer/stake/
validator_set.rs

1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4use cometindex::{
5    async_trait,
6    index::{EventBatch, EventBatchContext},
7    sqlx, AppView, ContextualizedEvent, PgTransaction,
8};
9
10use penumbra_sdk_app::genesis::Content;
11use penumbra_sdk_asset::asset;
12use penumbra_sdk_num::Amount;
13use penumbra_sdk_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
14use penumbra_sdk_stake::{
15    validator::{self, Validator},
16    IdentityKey,
17};
18
19use crate::parsing::parse_content;
20
21#[derive(Debug)]
22pub struct ValidatorSet {}
23
24impl ValidatorSet {
25    async fn index_event(
26        &self,
27        dbtx: &mut PgTransaction<'_>,
28        event: ContextualizedEvent<'_>,
29    ) -> Result<(), anyhow::Error> {
30        match event.event.kind.as_str() {
31            "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => {
32                let pe = pb::EventValidatorDefinitionUpload::from_event(event.as_ref())?;
33                let val = Validator::try_from(
34                    pe.validator
35                        .ok_or_else(|| anyhow!("missing validator in event"))?,
36                )?;
37
38                handle_upload(dbtx, val).await?;
39            }
40            "penumbra.core.component.stake.v1.EventDelegate" => {
41                let pe = pb::EventDelegate::from_event(event.as_ref())?;
42                let ik = IdentityKey::try_from(
43                    pe.identity_key
44                        .ok_or_else(|| anyhow!("missing ik in event"))?,
45                )?;
46                let amount = Amount::try_from(
47                    pe.amount
48                        .ok_or_else(|| anyhow!("missing amount in event"))?,
49                )?;
50
51                handle_delegate(dbtx, ik, amount).await?;
52            }
53            "penumbra.core.component.stake.v1.EventUndelegate" => {
54                let pe = pb::EventUndelegate::from_event(event.as_ref())?;
55                let ik = IdentityKey::try_from(
56                    pe.identity_key
57                        .ok_or_else(|| anyhow!("missing ik in event"))?,
58                )?;
59                let amount = Amount::try_from(
60                    pe.amount
61                        .ok_or_else(|| anyhow!("missing amount in event"))?,
62                )?;
63                handle_undelegate(dbtx, ik, amount).await?;
64            }
65            "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => {
66                let pe = pb::EventValidatorVotingPowerChange::from_event(event.as_ref())?;
67                let ik = IdentityKey::try_from(
68                    pe.identity_key
69                        .ok_or_else(|| anyhow!("missing ik in event"))?,
70                )?;
71                let voting_power = Amount::try_from(
72                    pe.voting_power
73                        .ok_or_else(|| anyhow!("missing amount in event"))?,
74                )?;
75                handle_voting_power_change(dbtx, ik, voting_power).await?;
76            }
77            "penumbra.core.component.stake.v1.EventValidatorStateChange" => {
78                let pe = pb::EventValidatorStateChange::from_event(event.as_ref())?;
79                let ik = IdentityKey::try_from(
80                    pe.identity_key
81                        .ok_or_else(|| anyhow!("missing ik in event"))?,
82                )?;
83                let state = validator::State::try_from(
84                    pe.state.ok_or_else(|| anyhow!("missing state in event"))?,
85                )?;
86                handle_validator_state_change(dbtx, ik, state).await?;
87            }
88            "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => {
89                let pe = pb::EventValidatorBondingStateChange::from_event(event.as_ref())?;
90                let ik = IdentityKey::try_from(
91                    pe.identity_key
92                        .ok_or_else(|| anyhow!("missing ik in event"))?,
93                )?;
94                let bonding_state = validator::BondingState::try_from(
95                    pe.bonding_state
96                        .ok_or_else(|| anyhow!("missing bonding_state in event"))?,
97                )?;
98                handle_validator_bonding_state_change(dbtx, ik, bonding_state).await?;
99            }
100            _ => {}
101        }
102
103        Ok(())
104    }
105}
106
107#[async_trait]
108impl AppView for ValidatorSet {
109    async fn init_chain(
110        &self,
111        dbtx: &mut PgTransaction,
112        app_state: &serde_json::Value,
113    ) -> Result<(), anyhow::Error> {
114        sqlx::query(
115            // table name is module path + struct name
116            // note: protobuf data is encoded as protojson for ease of consumers
117            // hence TEXT fields
118            "CREATE TABLE stake_validator_set (
119                id SERIAL PRIMARY KEY,
120                ik TEXT NOT NULL,
121                name TEXT NOT NULL,
122                definition TEXT NOT NULL,
123                voting_power BIGINT NOT NULL,
124                queued_delegations BIGINT NOT NULL,
125                queued_undelegations BIGINT NOT NULL,
126                validator_state TEXT NOT NULL,
127                bonding_state TEXT NOT NULL
128            );",
129        )
130        .execute(dbtx.as_mut())
131        .await?;
132
133        sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);")
134            .execute(dbtx.as_mut())
135            .await?;
136
137        add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?;
138        Ok(())
139    }
140
141    fn name(&self) -> String {
142        "stake/validator_set".to_string()
143    }
144
145    async fn index_batch(
146        &self,
147        dbtx: &mut PgTransaction,
148        batch: EventBatch,
149        _ctx: EventBatchContext,
150    ) -> Result<(), anyhow::Error> {
151        for event in batch.events() {
152            self.index_event(dbtx, event).await?;
153        }
154        Ok(())
155    }
156}
157
158async fn add_genesis_validators<'a>(dbtx: &mut PgTransaction<'a>, content: &Content) -> Result<()> {
159    // Given a genesis validator, we need to figure out its delegations at
160    // genesis by getting its delegation token then summing up all the allocations.
161    // Build up a table of the total allocations first.
162    let mut allos = BTreeMap::<asset::Id, Amount>::new();
163    for allo in &content.shielded_pool_content.allocations {
164        let value = allo.value();
165        let sum = allos.entry(value.asset_id).or_default();
166        *sum = sum
167            .checked_add(&value.amount)
168            .ok_or_else(|| anyhow::anyhow!("overflow adding genesis allos (should not happen)"))?;
169    }
170
171    for val in &content.stake_content.validators {
172        // FIXME: this shouldn't be a proto type but now that has been propagated
173        // all through the rest of the code for no reason
174        let val = Validator::try_from(val.clone())?;
175        let delegation_amount = allos.get(&val.token().id()).cloned().unwrap_or_default();
176
177        // insert sql
178        sqlx::query(
179            "INSERT INTO stake_validator_set (
180                ik, name, definition, voting_power, queued_delegations, 
181                queued_undelegations, validator_state, bonding_state
182            )
183            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
184        )
185        .bind(val.identity_key.to_string())
186        .bind(val.name.clone())
187        .bind(serde_json::to_string(&val).expect("can serialize"))
188        .bind(delegation_amount.value() as i64)
189        .bind(0) // queued_delegations
190        .bind(0) // queued_undelegations
191        .bind(serde_json::to_string(&validator::State::Active).unwrap()) // see add_genesis_validator
192        .bind(serde_json::to_string(&validator::BondingState::Bonded).unwrap()) // see add_genesis_validator
193        .execute(dbtx.as_mut())
194        .await?;
195    }
196
197    Ok(())
198}
199
200async fn handle_upload<'a>(dbtx: &mut PgTransaction<'a>, val: Validator) -> Result<()> {
201    // First, check if the validator already exists
202    let exists: bool =
203        sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM stake_validator_set WHERE ik = $1)")
204            .bind(&val.identity_key.to_string())
205            .fetch_one(dbtx.as_mut())
206            .await?;
207
208    if exists {
209        // Update existing validator, leaving all the other data like state, VP etc unchanged
210        sqlx::query(
211            "UPDATE stake_validator_set SET
212                name = $2,
213                definition = $3
214            WHERE ik = $1",
215        )
216        .bind(val.identity_key.to_string())
217        .bind(val.name.clone())
218        .bind(serde_json::to_string(&val).expect("can serialize"))
219        .execute(dbtx.as_mut())
220        .await?;
221    } else {
222        // Insert new validator
223        sqlx::query(
224            "INSERT INTO stake_validator_set (
225                ik, name, definition, voting_power, queued_delegations, 
226                queued_undelegations, validator_state, bonding_state
227            )
228            VALUES ($1, $2, $3, 0, 0, 0, $4, $5)",
229        )
230        .bind(val.identity_key.to_string())
231        .bind(val.name.clone())
232        .bind(serde_json::to_string(&val).expect("can serialize"))
233        .bind(serde_json::to_string(&validator::State::Defined).expect("can serialize")) // ValidatorManager::add_validator
234        .bind(serde_json::to_string(&validator::BondingState::Unbonded).expect("can serialize")) // ValidatorManager::add_validator
235        .execute(dbtx.as_mut())
236        .await?;
237    }
238
239    Ok(())
240}
241
242async fn handle_delegate<'a>(
243    dbtx: &mut PgTransaction<'a>,
244    ik: IdentityKey,
245    amount: Amount,
246) -> Result<()> {
247    // Update the validator's voting power and queued delegations
248    let rows_affected = sqlx::query(
249        "UPDATE stake_validator_set 
250        SET 
251            queued_delegations = queued_delegations + $2
252        WHERE ik = $1",
253    )
254    .bind(ik.to_string())
255    .bind(amount.value() as i64)
256    .execute(dbtx.as_mut())
257    .await?
258    .rows_affected();
259
260    // Check if the update was successful
261    if rows_affected == 0 {
262        anyhow::bail!("No validator found with the given identity key");
263    }
264
265    Ok(())
266}
267
268async fn handle_undelegate<'a>(
269    dbtx: &mut PgTransaction<'a>,
270    ik: IdentityKey,
271    amount: Amount,
272) -> Result<()> {
273    // Update only the queued undelegations
274    let rows_affected = sqlx::query(
275        "UPDATE stake_validator_set 
276        SET 
277            queued_undelegations = queued_undelegations + $2
278        WHERE ik = $1",
279    )
280    .bind(ik.to_string())
281    .bind(amount.value() as i64)
282    .execute(dbtx.as_mut())
283    .await?
284    .rows_affected();
285
286    // Check if the update was successful
287    if rows_affected == 0 {
288        anyhow::bail!("No validator found with the given identity key");
289    }
290
291    Ok(())
292}
293
294async fn handle_voting_power_change<'a>(
295    dbtx: &mut PgTransaction<'a>,
296    ik: IdentityKey,
297    voting_power: Amount,
298) -> Result<()> {
299    // Update the validator's voting power and reset queued delegations/undelegations
300    let rows_affected = sqlx::query(
301        "UPDATE stake_validator_set 
302        SET 
303            voting_power = $2, 
304            queued_delegations = 0,
305            queued_undelegations = 0
306        WHERE ik = $1",
307    )
308    .bind(ik.to_string())
309    .bind(voting_power.value() as i64)
310    .execute(dbtx.as_mut())
311    .await?
312    .rows_affected();
313
314    // Check if the update was successful
315    if rows_affected == 0 {
316        anyhow::bail!("No validator found with the given identity key");
317    }
318
319    Ok(())
320}
321
322async fn handle_validator_state_change<'a>(
323    dbtx: &mut PgTransaction<'a>,
324    ik: IdentityKey,
325    state: validator::State,
326) -> Result<()> {
327    // Update the validator's state
328    let rows_affected = sqlx::query(
329        "UPDATE stake_validator_set 
330        SET 
331            validator_state = $2
332        WHERE ik = $1",
333    )
334    .bind(ik.to_string())
335    .bind(serde_json::to_string(&state).expect("can serialize"))
336    .execute(dbtx.as_mut())
337    .await?
338    .rows_affected();
339
340    // Check if the update was successful
341    if rows_affected == 0 {
342        anyhow::bail!("No validator found with the given identity key");
343    }
344
345    Ok(())
346}
347
348async fn handle_validator_bonding_state_change<'a>(
349    dbtx: &mut PgTransaction<'a>,
350    ik: IdentityKey,
351    bonding_state: validator::BondingState,
352) -> Result<()> {
353    // Update the validator's bonding state
354    let rows_affected = sqlx::query(
355        "UPDATE stake_validator_set 
356        SET 
357            bonding_state = $2
358        WHERE ik = $1",
359    )
360    .bind(ik.to_string())
361    .bind(serde_json::to_string(&bonding_state).expect("can serialize"))
362    .execute(dbtx.as_mut())
363    .await?
364    .rows_affected();
365
366    // Check if the update was successful
367    if rows_affected == 0 {
368        anyhow::bail!("No validator found with the given identity key");
369    }
370
371    Ok(())
372}