1use ethnum::I256;
2use std::{collections::BTreeMap, iter};
3
4use cometindex::{
5 async_trait,
6 index::{EventBatch, EventBatchContext},
7 AppView, ContextualizedEvent, PgTransaction,
8};
9use penumbra_sdk_app::genesis::Content;
10use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_dex::{
12 event::{EventArbExecution, EventCandlestickData},
13 DirectedTradingPair,
14};
15use penumbra_sdk_fee::event::EventBlockFees;
16use penumbra_sdk_funding::event::EventFundingStreamReward;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_shielded_pool::event::{
20 EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund,
21 EventOutboundFungibleTokenTransfer,
22};
23use penumbra_sdk_stake::{
24 event::{EventDelegate, EventRateDataChange, EventUndelegate},
25 validator::Validator,
26 IdentityKey,
27};
28
29use crate::parsing::parse_content;
30
31#[derive(Debug, Clone, Copy)]
32struct ValidatorSupply {
33 um: u64,
34 rate_bps2: u64,
35}
36
37async fn modify_validator_supply(
38 dbtx: &mut PgTransaction<'_>,
39 height: u64,
40 ik: IdentityKey,
41 f: Box<dyn FnOnce(ValidatorSupply) -> anyhow::Result<ValidatorSupply> + Send + 'static>,
42) -> anyhow::Result<i64> {
43 let ik_text = ik.to_string();
44 let supply = {
45 let row: Option<(i64, i64)> = sqlx::query_as("
46 SELECT um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1
47 ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?;
48 let row = row.unwrap_or((0i64, 1_0000_0000i64));
49 ValidatorSupply {
50 um: u64::try_from(row.0)?,
51 rate_bps2: u64::try_from(row.1)?,
52 }
53 };
54 let new_supply = f(supply)?;
55 sqlx::query(
56 r#"
57 INSERT INTO _insights_validators
58 VALUES ($1, $2, $3, $4)
59 ON CONFLICT (validator_id, height) DO UPDATE SET
60 um = excluded.um,
61 rate_bps2 = excluded.rate_bps2
62 "#,
63 )
64 .bind(&ik_text)
65 .bind(i64::try_from(height)?)
66 .bind(i64::try_from(new_supply.um)?)
67 .bind(i64::try_from(new_supply.rate_bps2)?)
68 .execute(dbtx.as_mut())
69 .await?;
70 Ok(i64::try_from(new_supply.um)? - i64::try_from(supply.um)?)
71}
72
73#[derive(Default, Debug, Clone, Copy)]
74struct Supply {
75 total: u64,
76 staked: u64,
77 price: Option<f64>,
78}
79
80async fn modify_supply(
81 dbtx: &mut PgTransaction<'_>,
82 height: u64,
83 price_numeraire: Option<asset::Id>,
84 f: Box<dyn FnOnce(Supply) -> anyhow::Result<Supply> + Send + 'static>,
85) -> anyhow::Result<()> {
86 let supply: Supply = {
87 let row: Option<(i64, i64, Option<f64>)> = sqlx::query_as(
88 "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1",
89 )
90 .fetch_optional(dbtx.as_mut())
91 .await?;
92 row.map(|(total, staked, price)| {
93 anyhow::Result::<_>::Ok(Supply {
94 total: total.try_into()?,
95 staked: staked.try_into()?,
96 price,
97 })
98 })
99 .transpose()?
100 .unwrap_or_default()
101 };
102 let supply = f(supply)?;
103 sqlx::query(
104 r#"
105 INSERT INTO
106 insights_supply(height, total, staked, price, price_numeraire_asset_id)
107 VALUES ($1, $2, $3, $5, $4)
108 ON CONFLICT (height) DO UPDATE SET
109 total = excluded.total,
110 staked = excluded.staked,
111 price = excluded.price,
112 price_numeraire_asset_id = excluded.price_numeraire_asset_id
113 "#,
114 )
115 .bind(i64::try_from(height)?)
116 .bind(i64::try_from(supply.total)?)
117 .bind(i64::try_from(supply.staked)?)
118 .bind(price_numeraire.map(|x| x.to_bytes()))
119 .bind(supply.price)
120 .execute(dbtx.as_mut())
121 .await?;
122 Ok(())
123}
124
125#[derive(Debug, Clone, Copy, PartialEq)]
126enum DepositorExisted {
127 Yes,
128 No,
129}
130
131async fn register_depositor(
132 dbtx: &mut PgTransaction<'_>,
133 asset_id: asset::Id,
134 address: &str,
135) -> anyhow::Result<DepositorExisted> {
136 let exists: bool = sqlx::query_scalar(
137 "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)",
138 )
139 .bind(asset_id.to_bytes())
140 .bind(address)
141 .fetch_one(dbtx.as_mut())
142 .await?;
143 if exists {
144 return Ok(DepositorExisted::Yes);
145 }
146 sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)")
147 .bind(asset_id.to_bytes())
148 .bind(address)
149 .execute(dbtx.as_mut())
150 .await?;
151 Ok(DepositorExisted::No)
152}
153
154async fn asset_flow(
155 dbtx: &mut PgTransaction<'_>,
156 asset_id: asset::Id,
157 height: u64,
158 flow: I256,
159 refund: bool,
160 depositor_existed: DepositorExisted,
161) -> anyhow::Result<()> {
162 let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?;
163 let mut asset_pool = asset_pool
164 .map(|(t, c, u)| {
165 anyhow::Result::<(I256, I256, i32)>::Ok((
166 I256::from_str_radix(&t, 10)?,
167 I256::from_str_radix(&c, 10)?,
168 u,
169 ))
170 })
171 .transpose()?
172 .unwrap_or((I256::ZERO, I256::ZERO, 0i32));
173 asset_pool.0 += if refund {
174 I256::ZERO
175 } else {
176 flow.max(I256::ZERO)
177 };
178 asset_pool.1 += flow;
179 asset_pool.2 += match depositor_existed {
180 DepositorExisted::Yes => 0,
181 DepositorExisted::No => 1,
182 };
183 sqlx::query(
184 r#"
185 INSERT INTO insights_shielded_pool
186 VALUES ($1, $2, $3, $4, $5)
187 ON CONFLICT (asset_id, height) DO UPDATE SET
188 total_value = excluded.total_value,
189 current_value = excluded.current_value,
190 unique_depositors = excluded.unique_depositors
191 "#,
192 )
193 .bind(asset_id.to_bytes())
194 .bind(i64::try_from(height)?)
195 .bind(asset_pool.0.to_string())
196 .bind(asset_pool.1.to_string())
197 .bind(asset_pool.2)
198 .execute(dbtx.as_mut())
199 .await?;
200 Ok(())
201}
202
203#[derive(Debug)]
204pub struct Component {
205 price_numeraire: Option<asset::Id>,
206}
207
208impl Component {
209 pub fn new(price_numeraire: Option<asset::Id>) -> Self {
211 Self { price_numeraire }
212 }
213}
214
215async fn add_genesis_native_token_allocation_supply<'a>(
217 dbtx: &mut PgTransaction<'a>,
218 content: &Content,
219) -> anyhow::Result<()> {
220 fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
221 let community_pool_mint = iter::once((
222 *STAKING_TOKEN_ASSET_ID,
223 content.community_pool_content.initial_balance.amount,
224 ));
225 let allocation_mints = content
226 .shielded_pool_content
227 .allocations
228 .iter()
229 .map(|allocation| {
230 let value = allocation.value();
231 (value.asset_id, value.amount)
232 });
233
234 let mut out = BTreeMap::new();
235 for (id, amount) in community_pool_mint.chain(allocation_mints) {
236 out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
237 }
238 out
239 }
240
241 let mints = content_mints(content);
242
243 let unstaked = u64::try_from(
244 mints
245 .get(&*STAKING_TOKEN_ASSET_ID)
246 .copied()
247 .unwrap_or_default()
248 .value(),
249 )?;
250
251 let mut staked = 0u64;
252 for val in &content.stake_content.validators {
254 let val = Validator::try_from(val.clone())?;
255 let delegation_amount: u64 = mints
256 .get(&val.token().id())
257 .cloned()
258 .unwrap_or_default()
259 .value()
260 .try_into()?;
261 staked += delegation_amount;
262 modify_validator_supply(
263 dbtx,
264 0,
265 val.identity_key,
266 Box::new(move |_| {
267 Ok(ValidatorSupply {
268 um: delegation_amount,
269 rate_bps2: 1_0000_0000,
270 })
271 }),
272 )
273 .await?;
274 }
275
276 modify_supply(
277 dbtx,
278 0,
279 None,
280 Box::new(move |_| {
281 Ok(Supply {
282 total: unstaked + staked,
283 staked,
284 price: None,
285 })
286 }),
287 )
288 .await?;
289
290 Ok(())
291}
292
293impl Component {
294 async fn index_event(
295 &self,
296 dbtx: &mut PgTransaction<'_>,
297 event: ContextualizedEvent<'_>,
298 ) -> Result<(), anyhow::Error> {
299 let height = event.block_height;
300 if let Ok(e) = EventUndelegate::try_from_event(&event.event) {
301 let delta = modify_validator_supply(
302 dbtx,
303 height,
304 e.identity_key,
305 Box::new(move |supply| {
306 Ok(ValidatorSupply {
307 um: supply.um + u64::try_from(e.amount.value()).expect(""),
308 ..supply
309 })
310 }),
311 )
312 .await?;
313 modify_supply(
314 dbtx,
315 height,
316 self.price_numeraire,
317 Box::new(move |supply| {
318 Ok(Supply {
320 staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
321 ..supply
322 })
323 }),
324 )
325 .await?;
326 } else if let Ok(e) = EventDelegate::try_from_event(&event.event) {
327 let delta = modify_validator_supply(
328 dbtx,
329 height,
330 e.identity_key,
331 Box::new(move |supply| {
332 Ok(ValidatorSupply {
333 um: supply.um + u64::try_from(e.amount.value()).expect(""),
334 ..supply
335 })
336 }),
337 )
338 .await?;
339 modify_supply(
340 dbtx,
341 height,
342 self.price_numeraire,
343 Box::new(move |supply| {
344 Ok(Supply {
345 staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
346 ..supply
347 })
348 }),
349 )
350 .await?;
351 } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) {
352 let delta = modify_validator_supply(
353 dbtx,
354 height,
355 e.identity_key,
356 Box::new(move |supply| {
357 let um = (u128::from(supply.um) * e.rate_data.validator_exchange_rate.value())
363 .checked_div(supply.rate_bps2.into())
364 .unwrap_or(0u128)
365 .try_into()?;
366 Ok(ValidatorSupply {
367 um,
368 rate_bps2: u64::try_from(e.rate_data.validator_exchange_rate.value())?,
369 })
370 }),
371 )
372 .await?;
373 modify_supply(
374 dbtx,
375 height,
376 self.price_numeraire,
377 Box::new(move |supply| {
378 Ok(Supply {
380 total: u64::try_from(i64::try_from(supply.total)? + delta)?,
381 staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
382 ..supply
383 })
384 }),
385 )
386 .await?;
387 } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) {
388 let value = e.swapped_fee_total.value();
389 if value.asset_id == *STAKING_TOKEN_ASSET_ID {
390 let amount = u64::try_from(value.amount.value())?;
391 modify_supply(
394 dbtx,
395 height,
396 self.price_numeraire,
397 Box::new(move |supply| {
398 Ok(Supply {
399 total: supply.total - amount,
400 ..supply
401 })
402 }),
403 )
404 .await?;
405 }
406 } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
407 let input = e.swap_execution.input;
408 let output = e.swap_execution.output;
409 if input.asset_id == *STAKING_TOKEN_ASSET_ID
410 && output.asset_id == *STAKING_TOKEN_ASSET_ID
411 {
412 let profit = u64::try_from((output.amount - input.amount).value())?;
413 modify_supply(
414 dbtx,
415 height,
416 self.price_numeraire,
417 Box::new(move |supply| {
418 Ok(Supply {
419 total: supply.total - profit,
420 ..supply
421 })
422 }),
423 )
424 .await?;
425 }
426 } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) {
427 let amount = u64::try_from(e.reward_amount.value())?;
428 modify_supply(
429 dbtx,
430 height,
431 self.price_numeraire,
432 Box::new(move |supply| {
433 Ok(Supply {
434 total: supply.total + amount,
435 ..supply
436 })
437 }),
438 )
439 .await?;
440 } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) {
441 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
442 let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?;
443 let flow = I256::try_from(e.value.amount.value())?;
444 asset_flow(dbtx, e.value.asset_id, height, flow, false, existed).await?;
445 }
446 } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) {
447 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
448 let flow = I256::try_from(e.value.amount.value())?;
449 asset_flow(
451 dbtx,
452 e.value.asset_id,
453 height,
454 -flow,
455 false,
456 DepositorExisted::No,
457 )
458 .await?;
459 }
460 } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) {
461 if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
462 let flow = I256::try_from(e.value.amount.value())?;
463 asset_flow(
465 dbtx,
466 e.value.asset_id,
467 height,
468 flow,
469 true,
470 DepositorExisted::No,
471 )
472 .await?;
473 }
474 } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
475 if let Some(pn) = self.price_numeraire {
476 if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) {
477 let price = e.stick.close;
478 modify_supply(
479 dbtx,
480 height,
481 self.price_numeraire,
482 Box::new(move |supply| {
483 Ok(Supply {
484 price: Some(price),
485 ..supply
486 })
487 }),
488 )
489 .await?;
490 }
491 }
492 }
493 tracing::debug!(?event, "unrecognized event");
494 Ok(())
495 }
496}
497
498#[async_trait]
499impl AppView for Component {
500 async fn init_chain(
501 &self,
502 dbtx: &mut PgTransaction,
503 app_state: &serde_json::Value,
504 ) -> Result<(), anyhow::Error> {
505 for statement in include_str!("schema.sql").split(";") {
506 sqlx::query(statement).execute(dbtx.as_mut()).await?;
507 }
508
509 add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
513 .await?;
514 Ok(())
515 }
516
517 fn name(&self) -> String {
518 "insights".to_string()
519 }
520
521 async fn index_batch(
522 &self,
523 dbtx: &mut PgTransaction,
524 batch: EventBatch,
525 _ctx: EventBatchContext,
526 ) -> Result<(), anyhow::Error> {
527 for event in batch.events() {
528 self.index_event(dbtx, event).await?;
529 }
530 Ok(())
531 }
532}