1use std::collections::BTreeMap;
2
3use anyhow::{anyhow, Result};
4use cometindex::{
5 async_trait,
6 index::{EventBatch, EventBatchContext},
7 sqlx, AppView, ContextualizedEvent, PgTransaction,
8};
9
10use penumbra_sdk_app::genesis::Content;
11use penumbra_sdk_asset::asset;
12use penumbra_sdk_num::Amount;
13use penumbra_sdk_proto::{core::component::stake::v1 as pb, event::ProtoEvent};
14use penumbra_sdk_stake::{
15 validator::{self, Validator},
16 IdentityKey,
17};
18
19use crate::parsing::parse_content;
20
21#[derive(Debug)]
22pub struct ValidatorSet {}
23
24impl ValidatorSet {
25 async fn index_event(
26 &self,
27 dbtx: &mut PgTransaction<'_>,
28 event: ContextualizedEvent<'_>,
29 ) -> Result<(), anyhow::Error> {
30 match event.event.kind.as_str() {
31 "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => {
32 let pe = pb::EventValidatorDefinitionUpload::from_event(event.as_ref())?;
33 let val = Validator::try_from(
34 pe.validator
35 .ok_or_else(|| anyhow!("missing validator in event"))?,
36 )?;
37
38 handle_upload(dbtx, val).await?;
39 }
40 "penumbra.core.component.stake.v1.EventDelegate" => {
41 let pe = pb::EventDelegate::from_event(event.as_ref())?;
42 let ik = IdentityKey::try_from(
43 pe.identity_key
44 .ok_or_else(|| anyhow!("missing ik in event"))?,
45 )?;
46 let amount = Amount::try_from(
47 pe.amount
48 .ok_or_else(|| anyhow!("missing amount in event"))?,
49 )?;
50
51 handle_delegate(dbtx, ik, amount).await?;
52 }
53 "penumbra.core.component.stake.v1.EventUndelegate" => {
54 let pe = pb::EventUndelegate::from_event(event.as_ref())?;
55 let ik = IdentityKey::try_from(
56 pe.identity_key
57 .ok_or_else(|| anyhow!("missing ik in event"))?,
58 )?;
59 let amount = Amount::try_from(
60 pe.amount
61 .ok_or_else(|| anyhow!("missing amount in event"))?,
62 )?;
63 handle_undelegate(dbtx, ik, amount).await?;
64 }
65 "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => {
66 let pe = pb::EventValidatorVotingPowerChange::from_event(event.as_ref())?;
67 let ik = IdentityKey::try_from(
68 pe.identity_key
69 .ok_or_else(|| anyhow!("missing ik in event"))?,
70 )?;
71 let voting_power = Amount::try_from(
72 pe.voting_power
73 .ok_or_else(|| anyhow!("missing amount in event"))?,
74 )?;
75 handle_voting_power_change(dbtx, ik, voting_power).await?;
76 }
77 "penumbra.core.component.stake.v1.EventValidatorStateChange" => {
78 let pe = pb::EventValidatorStateChange::from_event(event.as_ref())?;
79 let ik = IdentityKey::try_from(
80 pe.identity_key
81 .ok_or_else(|| anyhow!("missing ik in event"))?,
82 )?;
83 let state = validator::State::try_from(
84 pe.state.ok_or_else(|| anyhow!("missing state in event"))?,
85 )?;
86 handle_validator_state_change(dbtx, ik, state).await?;
87 }
88 "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => {
89 let pe = pb::EventValidatorBondingStateChange::from_event(event.as_ref())?;
90 let ik = IdentityKey::try_from(
91 pe.identity_key
92 .ok_or_else(|| anyhow!("missing ik in event"))?,
93 )?;
94 let bonding_state = validator::BondingState::try_from(
95 pe.bonding_state
96 .ok_or_else(|| anyhow!("missing bonding_state in event"))?,
97 )?;
98 handle_validator_bonding_state_change(dbtx, ik, bonding_state).await?;
99 }
100 _ => {}
101 }
102
103 Ok(())
104 }
105}
106
107#[async_trait]
108impl AppView for ValidatorSet {
109 async fn init_chain(
110 &self,
111 dbtx: &mut PgTransaction,
112 app_state: &serde_json::Value,
113 ) -> Result<(), anyhow::Error> {
114 sqlx::query(
115 "CREATE TABLE stake_validator_set (
119 id SERIAL PRIMARY KEY,
120 ik TEXT NOT NULL,
121 name TEXT NOT NULL,
122 definition TEXT NOT NULL,
123 voting_power BIGINT NOT NULL,
124 queued_delegations BIGINT NOT NULL,
125 queued_undelegations BIGINT NOT NULL,
126 validator_state TEXT NOT NULL,
127 bonding_state TEXT NOT NULL
128 );",
129 )
130 .execute(dbtx.as_mut())
131 .await?;
132
133 sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);")
134 .execute(dbtx.as_mut())
135 .await?;
136
137 add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?;
138 Ok(())
139 }
140
141 fn name(&self) -> String {
142 "stake/validator_set".to_string()
143 }
144
145 async fn index_batch(
146 &self,
147 dbtx: &mut PgTransaction,
148 batch: EventBatch,
149 _ctx: EventBatchContext,
150 ) -> Result<(), anyhow::Error> {
151 for event in batch.events() {
152 self.index_event(dbtx, event).await?;
153 }
154 Ok(())
155 }
156}
157
158async fn add_genesis_validators<'a>(dbtx: &mut PgTransaction<'a>, content: &Content) -> Result<()> {
159 let mut allos = BTreeMap::<asset::Id, Amount>::new();
163 for allo in &content.shielded_pool_content.allocations {
164 let value = allo.value();
165 let sum = allos.entry(value.asset_id).or_default();
166 *sum = sum
167 .checked_add(&value.amount)
168 .ok_or_else(|| anyhow::anyhow!("overflow adding genesis allos (should not happen)"))?;
169 }
170
171 for val in &content.stake_content.validators {
172 let val = Validator::try_from(val.clone())?;
175 let delegation_amount = allos.get(&val.token().id()).cloned().unwrap_or_default();
176
177 sqlx::query(
179 "INSERT INTO stake_validator_set (
180 ik, name, definition, voting_power, queued_delegations,
181 queued_undelegations, validator_state, bonding_state
182 )
183 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
184 )
185 .bind(val.identity_key.to_string())
186 .bind(val.name.clone())
187 .bind(serde_json::to_string(&val).expect("can serialize"))
188 .bind(delegation_amount.value() as i64)
189 .bind(0) .bind(0) .bind(serde_json::to_string(&validator::State::Active).unwrap()) .bind(serde_json::to_string(&validator::BondingState::Bonded).unwrap()) .execute(dbtx.as_mut())
194 .await?;
195 }
196
197 Ok(())
198}
199
200async fn handle_upload<'a>(dbtx: &mut PgTransaction<'a>, val: Validator) -> Result<()> {
201 let exists: bool =
203 sqlx::query_scalar("SELECT EXISTS(SELECT 1 FROM stake_validator_set WHERE ik = $1)")
204 .bind(&val.identity_key.to_string())
205 .fetch_one(dbtx.as_mut())
206 .await?;
207
208 if exists {
209 sqlx::query(
211 "UPDATE stake_validator_set SET
212 name = $2,
213 definition = $3
214 WHERE ik = $1",
215 )
216 .bind(val.identity_key.to_string())
217 .bind(val.name.clone())
218 .bind(serde_json::to_string(&val).expect("can serialize"))
219 .execute(dbtx.as_mut())
220 .await?;
221 } else {
222 sqlx::query(
224 "INSERT INTO stake_validator_set (
225 ik, name, definition, voting_power, queued_delegations,
226 queued_undelegations, validator_state, bonding_state
227 )
228 VALUES ($1, $2, $3, 0, 0, 0, $4, $5)",
229 )
230 .bind(val.identity_key.to_string())
231 .bind(val.name.clone())
232 .bind(serde_json::to_string(&val).expect("can serialize"))
233 .bind(serde_json::to_string(&validator::State::Defined).expect("can serialize")) .bind(serde_json::to_string(&validator::BondingState::Unbonded).expect("can serialize")) .execute(dbtx.as_mut())
236 .await?;
237 }
238
239 Ok(())
240}
241
242async fn handle_delegate<'a>(
243 dbtx: &mut PgTransaction<'a>,
244 ik: IdentityKey,
245 amount: Amount,
246) -> Result<()> {
247 let rows_affected = sqlx::query(
249 "UPDATE stake_validator_set
250 SET
251 queued_delegations = queued_delegations + $2
252 WHERE ik = $1",
253 )
254 .bind(ik.to_string())
255 .bind(amount.value() as i64)
256 .execute(dbtx.as_mut())
257 .await?
258 .rows_affected();
259
260 if rows_affected == 0 {
262 anyhow::bail!("No validator found with the given identity key");
263 }
264
265 Ok(())
266}
267
268async fn handle_undelegate<'a>(
269 dbtx: &mut PgTransaction<'a>,
270 ik: IdentityKey,
271 amount: Amount,
272) -> Result<()> {
273 let rows_affected = sqlx::query(
275 "UPDATE stake_validator_set
276 SET
277 queued_undelegations = queued_undelegations + $2
278 WHERE ik = $1",
279 )
280 .bind(ik.to_string())
281 .bind(amount.value() as i64)
282 .execute(dbtx.as_mut())
283 .await?
284 .rows_affected();
285
286 if rows_affected == 0 {
288 anyhow::bail!("No validator found with the given identity key");
289 }
290
291 Ok(())
292}
293
294async fn handle_voting_power_change<'a>(
295 dbtx: &mut PgTransaction<'a>,
296 ik: IdentityKey,
297 voting_power: Amount,
298) -> Result<()> {
299 let rows_affected = sqlx::query(
301 "UPDATE stake_validator_set
302 SET
303 voting_power = $2,
304 queued_delegations = 0,
305 queued_undelegations = 0
306 WHERE ik = $1",
307 )
308 .bind(ik.to_string())
309 .bind(voting_power.value() as i64)
310 .execute(dbtx.as_mut())
311 .await?
312 .rows_affected();
313
314 if rows_affected == 0 {
316 anyhow::bail!("No validator found with the given identity key");
317 }
318
319 Ok(())
320}
321
322async fn handle_validator_state_change<'a>(
323 dbtx: &mut PgTransaction<'a>,
324 ik: IdentityKey,
325 state: validator::State,
326) -> Result<()> {
327 let rows_affected = sqlx::query(
329 "UPDATE stake_validator_set
330 SET
331 validator_state = $2
332 WHERE ik = $1",
333 )
334 .bind(ik.to_string())
335 .bind(serde_json::to_string(&state).expect("can serialize"))
336 .execute(dbtx.as_mut())
337 .await?
338 .rows_affected();
339
340 if rows_affected == 0 {
342 anyhow::bail!("No validator found with the given identity key");
343 }
344
345 Ok(())
346}
347
348async fn handle_validator_bonding_state_change<'a>(
349 dbtx: &mut PgTransaction<'a>,
350 ik: IdentityKey,
351 bonding_state: validator::BondingState,
352) -> Result<()> {
353 let rows_affected = sqlx::query(
355 "UPDATE stake_validator_set
356 SET
357 bonding_state = $2
358 WHERE ik = $1",
359 )
360 .bind(ik.to_string())
361 .bind(serde_json::to_string(&bonding_state).expect("can serialize"))
362 .execute(dbtx.as_mut())
363 .await?
364 .rows_affected();
365
366 if rows_affected == 0 {
368 anyhow::bail!("No validator found with the given identity key");
369 }
370
371 Ok(())
372}