1use anyhow::{anyhow, Context};
2use ethnum::I256;
3use std::{collections::BTreeMap, iter, ops::Div as _};
4
5use cometindex::{
6 async_trait,
7 index::{EventBatch, EventBatchContext},
8 AppView, ContextualizedEvent, PgTransaction,
9};
10use penumbra_sdk_app::genesis::Content;
11use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
12use penumbra_sdk_dex::{
13 event::{EventArbExecution, EventCandlestickData},
14 DirectedTradingPair,
15};
16use penumbra_sdk_fee::event::EventBlockFees;
17use penumbra_sdk_funding::event::EventFundingStreamReward;
18use penumbra_sdk_num::Amount;
19use penumbra_sdk_proto::event::EventDomainType;
20use penumbra_sdk_shielded_pool::event::{
21 EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund,
22 EventOutboundFungibleTokenTransfer,
23};
24use penumbra_sdk_stake::{
25 event::{EventDelegate, EventRateDataChange, EventUndelegate},
26 validator::Validator,
27 IdentityKey,
28};
29
30use crate::parsing::parse_content;
31
32fn convert_ceil(amount: u64, inv_rate_bps2: u64) -> anyhow::Result<u64> {
33 Ok(u64::try_from(
34 (u128::from(amount) * 1_0000_0000).div_ceil(u128::from(inv_rate_bps2)),
35 )?)
36}
37
38#[derive(Debug, Clone, Copy)]
39struct ValidatorSupply {
40 del_um: u64,
41 rate_bps2: u64,
42}
43
44async fn modify_validator_supply<T>(
45 dbtx: &mut PgTransaction<'_>,
46 height: u64,
47 ik: IdentityKey,
48 f: impl FnOnce(ValidatorSupply) -> anyhow::Result<(T, ValidatorSupply)>,
49) -> anyhow::Result<T> {
50 let ik_text = ik.to_string();
51 let supply = {
52 let row: Option<(i64, i64)> = sqlx::query_as("
53 SELECT del_um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1
54 ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?;
55 let row = row.unwrap_or((0i64, 1_0000_0000i64));
56 ValidatorSupply {
57 del_um: u64::try_from(row.0)?,
58 rate_bps2: u64::try_from(row.1)?,
59 }
60 };
61 let (out, new_supply) = f(supply)?;
62 sqlx::query(
63 r#"
64 INSERT INTO _insights_validators
65 VALUES ($1, $2, $3, $4)
66 ON CONFLICT (validator_id, height) DO UPDATE SET
67 del_um = excluded.del_um,
68 rate_bps2 = excluded.rate_bps2
69 "#,
70 )
71 .bind(&ik_text)
72 .bind(i64::try_from(height)?)
73 .bind(i64::try_from(new_supply.del_um)?)
74 .bind(i64::try_from(new_supply.rate_bps2)?)
75 .execute(dbtx.as_mut())
76 .await?;
77 Ok(out)
78}
79
80#[derive(Default, Debug, Clone, Copy)]
81struct Supply {
82 total: u64,
83 staked: u64,
84 price: Option<f64>,
85}
86
87async fn modify_supply(
88 dbtx: &mut PgTransaction<'_>,
89 height: u64,
90 price_numeraire: Option<asset::Id>,
91 f: Box<dyn FnOnce(Supply) -> anyhow::Result<Supply> + Send + 'static>,
92) -> anyhow::Result<()> {
93 let supply: Supply = {
94 let row: Option<(i64, i64, Option<f64>)> = sqlx::query_as(
95 "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1",
96 )
97 .fetch_optional(dbtx.as_mut())
98 .await?;
99 row.map(|(total, staked, price)| {
100 anyhow::Result::<_>::Ok(Supply {
101 total: total.try_into()?,
102 staked: staked.try_into()?,
103 price,
104 })
105 })
106 .transpose()?
107 .unwrap_or_default()
108 };
109 let supply = f(supply)?;
110 sqlx::query(
111 r#"
112 INSERT INTO
113 insights_supply(height, total, staked, price, price_numeraire_asset_id)
114 VALUES ($1, $2, $3, $5, $4)
115 ON CONFLICT (height) DO UPDATE SET
116 total = excluded.total,
117 staked = excluded.staked,
118 price = excluded.price,
119 price_numeraire_asset_id = excluded.price_numeraire_asset_id
120 "#,
121 )
122 .bind(i64::try_from(height)?)
123 .bind(i64::try_from(supply.total)?)
124 .bind(i64::try_from(supply.staked)?)
125 .bind(price_numeraire.map(|x| x.to_bytes()))
126 .bind(supply.price)
127 .execute(dbtx.as_mut())
128 .await?;
129 Ok(())
130}
131
132#[derive(Debug, Clone, Copy, PartialEq)]
133enum DepositorExisted {
134 Yes,
135 No,
136}
137
138async fn register_depositor(
139 dbtx: &mut PgTransaction<'_>,
140 asset_id: asset::Id,
141 address: &str,
142) -> anyhow::Result<DepositorExisted> {
143 let exists: bool = sqlx::query_scalar(
144 "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)",
145 )
146 .bind(asset_id.to_bytes())
147 .bind(address)
148 .fetch_one(dbtx.as_mut())
149 .await?;
150 if exists {
151 return Ok(DepositorExisted::Yes);
152 }
153 sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)")
154 .bind(asset_id.to_bytes())
155 .bind(address)
156 .execute(dbtx.as_mut())
157 .await?;
158 Ok(DepositorExisted::No)
159}
160
161async fn asset_flow(
162 dbtx: &mut PgTransaction<'_>,
163 asset_id: asset::Id,
164 height: u64,
165 flow: I256,
166 refund: bool,
167 depositor_existed: DepositorExisted,
168) -> anyhow::Result<()> {
169 let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?;
170 let mut asset_pool = asset_pool
171 .map(|(t, c, u)| {
172 anyhow::Result::<(I256, I256, i32)>::Ok((
173 I256::from_str_radix(&t, 10)?,
174 I256::from_str_radix(&c, 10)?,
175 u,
176 ))
177 })
178 .transpose()?
179 .unwrap_or((I256::ZERO, I256::ZERO, 0i32));
180 asset_pool.0 += if refund {
181 I256::ZERO
182 } else {
183 flow.max(I256::ZERO)
184 };
185 asset_pool.1 += flow;
186 asset_pool.2 += match depositor_existed {
187 DepositorExisted::Yes => 0,
188 DepositorExisted::No => 1,
189 };
190 sqlx::query(
191 r#"
192 INSERT INTO insights_shielded_pool
193 VALUES ($1, $2, $3, $4, $5)
194 ON CONFLICT (asset_id, height) DO UPDATE SET
195 total_value = excluded.total_value,
196 current_value = excluded.current_value,
197 unique_depositors = excluded.unique_depositors
198 "#,
199 )
200 .bind(asset_id.to_bytes())
201 .bind(i64::try_from(height)?)
202 .bind(asset_pool.0.to_string())
203 .bind(asset_pool.1.to_string())
204 .bind(asset_pool.2)
205 .execute(dbtx.as_mut())
206 .await?;
207 Ok(())
208}
209
210#[derive(Debug)]
211pub struct Component {
212 price_numeraire: Option<asset::Id>,
213}
214
215impl Component {
216 pub fn new(price_numeraire: Option<asset::Id>) -> Self {
218 Self { price_numeraire }
219 }
220}
221
222async fn add_genesis_native_token_allocation_supply<'a>(
224 dbtx: &mut PgTransaction<'a>,
225 content: &Content,
226) -> anyhow::Result<()> {
227 fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
228 let community_pool_mint = iter::once((
229 *STAKING_TOKEN_ASSET_ID,
230 content.community_pool_content.initial_balance.amount,
231 ));
232 let allocation_mints = content
233 .shielded_pool_content
234 .allocations
235 .iter()
236 .map(|allocation| {
237 let value = allocation.value();
238 (value.asset_id, value.amount)
239 });
240
241 let mut out = BTreeMap::new();
242 for (id, amount) in community_pool_mint.chain(allocation_mints) {
243 out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
244 }
245 out
246 }
247
248 let mints = content_mints(content);
249
250 let unstaked = u64::try_from(
251 mints
252 .get(&*STAKING_TOKEN_ASSET_ID)
253 .copied()
254 .unwrap_or_default()
255 .value(),
256 )?;
257
258 let mut staked = 0u64;
259 for val in &content.stake_content.validators {
261 let val = Validator::try_from(val.clone())?;
262 let delegation_amount: u64 = mints
263 .get(&val.token().id())
264 .cloned()
265 .unwrap_or_default()
266 .value()
267 .try_into()?;
268 staked += delegation_amount;
269 modify_validator_supply(dbtx, 0, val.identity_key, move |_| {
270 Ok((
271 (),
272 ValidatorSupply {
273 del_um: delegation_amount,
274 rate_bps2: 1_0000_0000,
275 },
276 ))
277 })
278 .await?;
279 }
280
281 modify_supply(
282 dbtx,
283 0,
284 None,
285 Box::new(move |_| {
286 Ok(Supply {
287 total: unstaked + staked,
288 staked,
289 price: None,
290 })
291 }),
292 )
293 .await?;
294
295 Ok(())
296}
297
298impl Component {
299 async fn index_event(
300 &self,
301 dbtx: &mut PgTransaction<'_>,
302 event: ContextualizedEvent<'_>,
303 ) -> Result<(), anyhow::Error> {
304 let height = event.block_height;
305 if let Ok(e) = EventUndelegate::try_from_event(&event.event) {
306 let amount = u64::try_from(e.amount.value())
307 .context("I-000-002: undelegation amount not u64")?;
308 modify_validator_supply(dbtx, height, e.identity_key, move |supply| {
311 Ok((
312 (),
313 ValidatorSupply {
314 del_um: supply
315 .del_um
316 .checked_sub(convert_ceil(amount, supply.rate_bps2)?)
317 .ok_or(anyhow!(
318 "I-000-005: underflow of validator supply for {}",
319 e.identity_key
320 ))?,
321 ..supply
322 },
323 ))
324 })
325 .await?;
326 modify_supply(
327 dbtx,
328 height,
329 self.price_numeraire,
330 Box::new(move |supply| {
331 Ok(Supply {
333 staked: supply
334 .staked
335 .checked_sub(amount)
336 .ok_or(anyhow!("I-000-001: underflow of staked supply"))?,
337 ..supply
338 })
339 }),
340 )
341 .await?;
342 } else if let Ok(e) = EventDelegate::try_from_event(&event.event) {
343 let amount = u64::try_from(e.amount.value())
344 .context("I-000-003: undelegation amount not u64")?;
345 modify_validator_supply(
346 dbtx,
347 height,
348 e.identity_key,
349 move |supply| {
351 Ok((
352 (),
353 ValidatorSupply {
354 del_um: supply
355 .del_um
356 .checked_add(convert_ceil(amount, supply.rate_bps2)?)
357 .ok_or(anyhow!(
358 "I-000-006: overflow of validator supply for {}",
359 e.identity_key
360 ))?,
361 ..supply
362 },
363 ))
364 },
365 )
366 .await?;
367 modify_supply(
368 dbtx,
369 height,
370 self.price_numeraire,
371 Box::new(move |supply| {
372 Ok(Supply {
373 staked: supply
374 .staked
375 .checked_add(amount)
376 .ok_or(anyhow!("I-000-004: overflow of staked supply"))?,
377 ..supply
378 })
379 }),
380 )
381 .await?;
382 } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) {
383 let delta = modify_validator_supply(dbtx, height, e.identity_key, move |supply| {
384 let rate_bps2 = u64::try_from(e.rate_data.validator_exchange_rate.value())?;
385 let old_um =
386 (i128::from(supply.del_um) * i128::from(supply.rate_bps2)).div(1_0000_0000);
387 let new_um = (i128::from(supply.del_um) * i128::from(rate_bps2)).div(1_0000_0000);
388 Ok((
389 i64::try_from(new_um - old_um)?,
390 ValidatorSupply {
391 rate_bps2,
392 del_um: supply.del_um,
393 },
394 ))
395 })
396 .await?;
397 modify_supply(
398 dbtx,
399 height,
400 self.price_numeraire,
401 Box::new(move |supply| {
402 Ok(Supply {
404 total: u64::try_from(i64::try_from(supply.total)? + delta)?,
405 staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
406 ..supply
407 })
408 }),
409 )
410 .await?;
411 } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) {
412 let value = e.swapped_fee_total.value();
413 if value.asset_id == *STAKING_TOKEN_ASSET_ID {
414 let amount = u64::try_from(value.amount.value())?;
415 modify_supply(
418 dbtx,
419 height,
420 self.price_numeraire,
421 Box::new(move |supply| {
422 Ok(Supply {
423 total: supply.total - amount,
424 ..supply
425 })
426 }),
427 )
428 .await?;
429 }
430 } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
431 let input = e.swap_execution.input;
432 let output = e.swap_execution.output;
433 if input.asset_id == *STAKING_TOKEN_ASSET_ID
434 && output.asset_id == *STAKING_TOKEN_ASSET_ID
435 {
436 let profit = u64::try_from((output.amount - input.amount).value())?;
437 modify_supply(
438 dbtx,
439 height,
440 self.price_numeraire,
441 Box::new(move |supply| {
442 Ok(Supply {
443 total: supply.total - profit,
444 ..supply
445 })
446 }),
447 )
448 .await?;
449 }
450 } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) {
451 let amount = u64::try_from(e.reward_amount.value())?;
452 modify_supply(
453 dbtx,
454 height,
455 self.price_numeraire,
456 Box::new(move |supply| {
457 Ok(Supply {
458 total: supply.total + amount,
459 ..supply
460 })
461 }),
462 )
463 .await?;
464 } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) {
465 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
466 let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?;
467 let flow = I256::try_from(e.value.amount.value())?;
468 asset_flow(dbtx, e.value.asset_id, height, flow, false, existed).await?;
469 }
470 } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) {
471 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
472 let flow = I256::try_from(e.value.amount.value())?;
473 asset_flow(
475 dbtx,
476 e.value.asset_id,
477 height,
478 -flow,
479 false,
480 DepositorExisted::No,
481 )
482 .await?;
483 }
484 } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) {
485 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
486 let flow = I256::try_from(e.value.amount.value())?;
487 asset_flow(
489 dbtx,
490 e.value.asset_id,
491 height,
492 flow,
493 true,
494 DepositorExisted::No,
495 )
496 .await?;
497 }
498 } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
499 if let Some(pn) = self.price_numeraire {
500 if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) {
501 let price = e.stick.close;
502 modify_supply(
503 dbtx,
504 height,
505 self.price_numeraire,
506 Box::new(move |supply| {
507 Ok(Supply {
508 price: Some(price),
509 ..supply
510 })
511 }),
512 )
513 .await?;
514 }
515 }
516 }
517 tracing::debug!(?event, "unrecognized event");
518 Ok(())
519 }
520}
521
522#[async_trait]
523impl AppView for Component {
524 async fn init_chain(
525 &self,
526 dbtx: &mut PgTransaction,
527 app_state: &serde_json::Value,
528 ) -> Result<(), anyhow::Error> {
529 for statement in include_str!("schema.sql").split(";") {
530 sqlx::query(statement).execute(dbtx.as_mut()).await?;
531 }
532
533 add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
537 .await?;
538 Ok(())
539 }
540
541 fn name(&self) -> String {
542 "insights".to_string()
543 }
544
545 async fn index_batch(
546 &self,
547 dbtx: &mut PgTransaction,
548 batch: EventBatch,
549 _ctx: EventBatchContext,
550 ) -> Result<(), anyhow::Error> {
551 for event in batch.events() {
552 self.index_event(dbtx, event).await?;
553 }
554 Ok(())
555 }
556}