pindexer/insights/
mod.rs

1use anyhow::{anyhow, Context};
2use ethnum::I256;
3use std::{collections::BTreeMap, iter, ops::Div as _};
4
5use cometindex::{
6    async_trait,
7    index::{EventBatch, EventBatchContext},
8    AppView, ContextualizedEvent, PgTransaction,
9};
10use penumbra_sdk_app::genesis::Content;
11use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
12use penumbra_sdk_dex::{
13    event::{EventArbExecution, EventCandlestickData},
14    DirectedTradingPair,
15};
16use penumbra_sdk_fee::event::EventBlockFees;
17use penumbra_sdk_funding::event::EventFundingStreamReward;
18use penumbra_sdk_num::Amount;
19use penumbra_sdk_proto::event::EventDomainType;
20use penumbra_sdk_shielded_pool::event::{
21    EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund,
22    EventOutboundFungibleTokenTransfer,
23};
24use penumbra_sdk_stake::{
25    event::{EventDelegate, EventRateDataChange, EventUndelegate},
26    validator::Validator,
27    IdentityKey,
28};
29
30use crate::parsing::parse_content;
31
32fn convert_ceil(amount: u64, inv_rate_bps2: u64) -> anyhow::Result<u64> {
33    Ok(u64::try_from(
34        (u128::from(amount) * 1_0000_0000).div_ceil(u128::from(inv_rate_bps2)),
35    )?)
36}
37
38#[derive(Debug, Clone, Copy)]
39struct ValidatorSupply {
40    del_um: u64,
41    rate_bps2: u64,
42}
43
44async fn modify_validator_supply<T>(
45    dbtx: &mut PgTransaction<'_>,
46    height: u64,
47    ik: IdentityKey,
48    f: impl FnOnce(ValidatorSupply) -> anyhow::Result<(T, ValidatorSupply)>,
49) -> anyhow::Result<T> {
50    let ik_text = ik.to_string();
51    let supply = {
52        let row: Option<(i64, i64)> = sqlx::query_as("
53            SELECT del_um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1
54        ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?;
55        let row = row.unwrap_or((0i64, 1_0000_0000i64));
56        ValidatorSupply {
57            del_um: u64::try_from(row.0)?,
58            rate_bps2: u64::try_from(row.1)?,
59        }
60    };
61    let (out, new_supply) = f(supply)?;
62    sqlx::query(
63        r#"
64        INSERT INTO _insights_validators 
65        VALUES ($1, $2, $3, $4)
66        ON CONFLICT (validator_id, height) DO UPDATE SET
67            del_um = excluded.del_um,
68            rate_bps2 = excluded.rate_bps2
69    "#,
70    )
71    .bind(&ik_text)
72    .bind(i64::try_from(height)?)
73    .bind(i64::try_from(new_supply.del_um)?)
74    .bind(i64::try_from(new_supply.rate_bps2)?)
75    .execute(dbtx.as_mut())
76    .await?;
77    Ok(out)
78}
79
80#[derive(Default, Debug, Clone, Copy)]
81struct Supply {
82    total: u64,
83    staked: u64,
84    price: Option<f64>,
85}
86
87async fn modify_supply(
88    dbtx: &mut PgTransaction<'_>,
89    height: u64,
90    price_numeraire: Option<asset::Id>,
91    f: Box<dyn FnOnce(Supply) -> anyhow::Result<Supply> + Send + 'static>,
92) -> anyhow::Result<()> {
93    let supply: Supply = {
94        let row: Option<(i64, i64, Option<f64>)> = sqlx::query_as(
95            "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1",
96        )
97        .fetch_optional(dbtx.as_mut())
98        .await?;
99        row.map(|(total, staked, price)| {
100            anyhow::Result::<_>::Ok(Supply {
101                total: total.try_into()?,
102                staked: staked.try_into()?,
103                price,
104            })
105        })
106        .transpose()?
107        .unwrap_or_default()
108    };
109    let supply = f(supply)?;
110    sqlx::query(
111        r#"
112        INSERT INTO 
113            insights_supply(height, total, staked, price, price_numeraire_asset_id) 
114            VALUES ($1, $2, $3, $5, $4)
115        ON CONFLICT (height) DO UPDATE SET
116        total = excluded.total,
117        staked = excluded.staked,
118        price = excluded.price,
119        price_numeraire_asset_id = excluded.price_numeraire_asset_id
120    "#,
121    )
122    .bind(i64::try_from(height)?)
123    .bind(i64::try_from(supply.total)?)
124    .bind(i64::try_from(supply.staked)?)
125    .bind(price_numeraire.map(|x| x.to_bytes()))
126    .bind(supply.price)
127    .execute(dbtx.as_mut())
128    .await?;
129    Ok(())
130}
131
132#[derive(Debug, Clone, Copy, PartialEq)]
133enum DepositorExisted {
134    Yes,
135    No,
136}
137
138async fn register_depositor(
139    dbtx: &mut PgTransaction<'_>,
140    asset_id: asset::Id,
141    address: &str,
142) -> anyhow::Result<DepositorExisted> {
143    let exists: bool = sqlx::query_scalar(
144        "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)",
145    )
146    .bind(asset_id.to_bytes())
147    .bind(address)
148    .fetch_one(dbtx.as_mut())
149    .await?;
150    if exists {
151        return Ok(DepositorExisted::Yes);
152    }
153    sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)")
154        .bind(asset_id.to_bytes())
155        .bind(address)
156        .execute(dbtx.as_mut())
157        .await?;
158    Ok(DepositorExisted::No)
159}
160
161async fn asset_flow(
162    dbtx: &mut PgTransaction<'_>,
163    asset_id: asset::Id,
164    height: u64,
165    flow: I256,
166    refund: bool,
167    depositor_existed: DepositorExisted,
168) -> anyhow::Result<()> {
169    let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?;
170    let mut asset_pool = asset_pool
171        .map(|(t, c, u)| {
172            anyhow::Result::<(I256, I256, i32)>::Ok((
173                I256::from_str_radix(&t, 10)?,
174                I256::from_str_radix(&c, 10)?,
175                u,
176            ))
177        })
178        .transpose()?
179        .unwrap_or((I256::ZERO, I256::ZERO, 0i32));
180    asset_pool.0 += if refund {
181        I256::ZERO
182    } else {
183        flow.max(I256::ZERO)
184    };
185    asset_pool.1 += flow;
186    asset_pool.2 += match depositor_existed {
187        DepositorExisted::Yes => 0,
188        DepositorExisted::No => 1,
189    };
190    sqlx::query(
191        r#"
192        INSERT INTO insights_shielded_pool
193        VALUES ($1, $2, $3, $4, $5)
194        ON CONFLICT (asset_id, height) DO UPDATE SET
195        total_value = excluded.total_value,
196        current_value = excluded.current_value,
197        unique_depositors = excluded.unique_depositors
198    "#,
199    )
200    .bind(asset_id.to_bytes())
201    .bind(i64::try_from(height)?)
202    .bind(asset_pool.0.to_string())
203    .bind(asset_pool.1.to_string())
204    .bind(asset_pool.2)
205    .execute(dbtx.as_mut())
206    .await?;
207    Ok(())
208}
209
210#[derive(Debug)]
211pub struct Component {
212    price_numeraire: Option<asset::Id>,
213}
214
215impl Component {
216    /// This component depends on a reference asset for the total supply price.
217    pub fn new(price_numeraire: Option<asset::Id>) -> Self {
218        Self { price_numeraire }
219    }
220}
221
222/// Add the initial native token supply.
223async fn add_genesis_native_token_allocation_supply<'a>(
224    dbtx: &mut PgTransaction<'a>,
225    content: &Content,
226) -> anyhow::Result<()> {
227    fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
228        let community_pool_mint = iter::once((
229            *STAKING_TOKEN_ASSET_ID,
230            content.community_pool_content.initial_balance.amount,
231        ));
232        let allocation_mints = content
233            .shielded_pool_content
234            .allocations
235            .iter()
236            .map(|allocation| {
237                let value = allocation.value();
238                (value.asset_id, value.amount)
239            });
240
241        let mut out = BTreeMap::new();
242        for (id, amount) in community_pool_mint.chain(allocation_mints) {
243            out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
244        }
245        out
246    }
247
248    let mints = content_mints(content);
249
250    let unstaked = u64::try_from(
251        mints
252            .get(&*STAKING_TOKEN_ASSET_ID)
253            .copied()
254            .unwrap_or_default()
255            .value(),
256    )?;
257
258    let mut staked = 0u64;
259    // at genesis, assume a 1:1 ratio between delegation amount and native token amount.
260    for val in &content.stake_content.validators {
261        let val = Validator::try_from(val.clone())?;
262        let delegation_amount: u64 = mints
263            .get(&val.token().id())
264            .cloned()
265            .unwrap_or_default()
266            .value()
267            .try_into()?;
268        staked += delegation_amount;
269        modify_validator_supply(dbtx, 0, val.identity_key, move |_| {
270            Ok((
271                (),
272                ValidatorSupply {
273                    del_um: delegation_amount,
274                    rate_bps2: 1_0000_0000,
275                },
276            ))
277        })
278        .await?;
279    }
280
281    modify_supply(
282        dbtx,
283        0,
284        None,
285        Box::new(move |_| {
286            Ok(Supply {
287                total: unstaked + staked,
288                staked,
289                price: None,
290            })
291        }),
292    )
293    .await?;
294
295    Ok(())
296}
297
298impl Component {
299    async fn index_event(
300        &self,
301        dbtx: &mut PgTransaction<'_>,
302        event: ContextualizedEvent<'_>,
303    ) -> Result<(), anyhow::Error> {
304        let height = event.block_height;
305        if let Ok(e) = EventUndelegate::try_from_event(&event.event) {
306            let amount = u64::try_from(e.amount.value())
307                .context("I-000-002: undelegation amount not u64")?;
308            // When the delegated um was undelegated, conversion was applied to round down,
309            // so when converting back, we round up.
310            modify_validator_supply(dbtx, height, e.identity_key, move |supply| {
311                Ok((
312                    (),
313                    ValidatorSupply {
314                        del_um: supply
315                            .del_um
316                            .checked_sub(convert_ceil(amount, supply.rate_bps2)?)
317                            .ok_or(anyhow!(
318                                "I-000-005: underflow of validator supply for {}",
319                                e.identity_key
320                            ))?,
321                        ..supply
322                    },
323                ))
324            })
325            .await?;
326            modify_supply(
327                dbtx,
328                height,
329                self.price_numeraire,
330                Box::new(move |supply| {
331                    // The amount staked has changed, but no inflation has happened.
332                    Ok(Supply {
333                        staked: supply
334                            .staked
335                            .checked_sub(amount)
336                            .ok_or(anyhow!("I-000-001: underflow of staked supply"))?,
337                        ..supply
338                    })
339                }),
340            )
341            .await?;
342        } else if let Ok(e) = EventDelegate::try_from_event(&event.event) {
343            let amount = u64::try_from(e.amount.value())
344                .context("I-000-003: undelegation amount not u64")?;
345            modify_validator_supply(
346                dbtx,
347                height,
348                e.identity_key,
349                // When converting, we round down so that the user gets *less*.
350                move |supply| {
351                    Ok((
352                        (),
353                        ValidatorSupply {
354                            del_um: supply
355                                .del_um
356                                .checked_add(convert_ceil(amount, supply.rate_bps2)?)
357                                .ok_or(anyhow!(
358                                    "I-000-006: overflow of validator supply for {}",
359                                    e.identity_key
360                                ))?,
361                            ..supply
362                        },
363                    ))
364                },
365            )
366            .await?;
367            modify_supply(
368                dbtx,
369                height,
370                self.price_numeraire,
371                Box::new(move |supply| {
372                    Ok(Supply {
373                        staked: supply
374                            .staked
375                            .checked_add(amount)
376                            .ok_or(anyhow!("I-000-004: overflow of staked supply"))?,
377                        ..supply
378                    })
379                }),
380            )
381            .await?;
382        } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) {
383            let delta = modify_validator_supply(dbtx, height, e.identity_key, move |supply| {
384                let rate_bps2 = u64::try_from(e.rate_data.validator_exchange_rate.value())?;
385                let old_um =
386                    (i128::from(supply.del_um) * i128::from(supply.rate_bps2)).div(1_0000_0000);
387                let new_um = (i128::from(supply.del_um) * i128::from(rate_bps2)).div(1_0000_0000);
388                Ok((
389                    i64::try_from(new_um - old_um)?,
390                    ValidatorSupply {
391                        rate_bps2,
392                        del_um: supply.del_um,
393                    },
394                ))
395            })
396            .await?;
397            modify_supply(
398                dbtx,
399                height,
400                self.price_numeraire,
401                Box::new(move |supply| {
402                    // Value has been created or destroyed!
403                    Ok(Supply {
404                        total: u64::try_from(i64::try_from(supply.total)? + delta)?,
405                        staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
406                        ..supply
407                    })
408                }),
409            )
410            .await?;
411        } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) {
412            let value = e.swapped_fee_total.value();
413            if value.asset_id == *STAKING_TOKEN_ASSET_ID {
414                let amount = u64::try_from(value.amount.value())?;
415                // We consider the tip to be destroyed too, matching the current logic
416                // DRAGON: if this changes, this code should use the base fee only.
417                modify_supply(
418                    dbtx,
419                    height,
420                    self.price_numeraire,
421                    Box::new(move |supply| {
422                        Ok(Supply {
423                            total: supply.total - amount,
424                            ..supply
425                        })
426                    }),
427                )
428                .await?;
429            }
430        } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
431            let input = e.swap_execution.input;
432            let output = e.swap_execution.output;
433            if input.asset_id == *STAKING_TOKEN_ASSET_ID
434                && output.asset_id == *STAKING_TOKEN_ASSET_ID
435            {
436                let profit = u64::try_from((output.amount - input.amount).value())?;
437                modify_supply(
438                    dbtx,
439                    height,
440                    self.price_numeraire,
441                    Box::new(move |supply| {
442                        Ok(Supply {
443                            total: supply.total - profit,
444                            ..supply
445                        })
446                    }),
447                )
448                .await?;
449            }
450        } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) {
451            let amount = u64::try_from(e.reward_amount.value())?;
452            modify_supply(
453                dbtx,
454                height,
455                self.price_numeraire,
456                Box::new(move |supply| {
457                    Ok(Supply {
458                        total: supply.total + amount,
459                        ..supply
460                    })
461                }),
462            )
463            .await?;
464        } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) {
465            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
466                let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?;
467                let flow = I256::try_from(e.value.amount.value())?;
468                asset_flow(dbtx, e.value.asset_id, height, flow, false, existed).await?;
469            }
470        } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) {
471            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
472                let flow = I256::try_from(e.value.amount.value())?;
473                // For outbound transfers, never increment unique count
474                asset_flow(
475                    dbtx,
476                    e.value.asset_id,
477                    height,
478                    -flow,
479                    false,
480                    DepositorExisted::No,
481                )
482                .await?;
483            }
484        } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) {
485            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
486                let flow = I256::try_from(e.value.amount.value())?;
487                // For outbound transfers, never increment unique count.
488                asset_flow(
489                    dbtx,
490                    e.value.asset_id,
491                    height,
492                    flow,
493                    true,
494                    DepositorExisted::No,
495                )
496                .await?;
497            }
498        } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
499            if let Some(pn) = self.price_numeraire {
500                if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) {
501                    let price = e.stick.close;
502                    modify_supply(
503                        dbtx,
504                        height,
505                        self.price_numeraire,
506                        Box::new(move |supply| {
507                            Ok(Supply {
508                                price: Some(price),
509                                ..supply
510                            })
511                        }),
512                    )
513                    .await?;
514                }
515            }
516        }
517        tracing::debug!(?event, "unrecognized event");
518        Ok(())
519    }
520}
521
522#[async_trait]
523impl AppView for Component {
524    async fn init_chain(
525        &self,
526        dbtx: &mut PgTransaction,
527        app_state: &serde_json::Value,
528    ) -> Result<(), anyhow::Error> {
529        for statement in include_str!("schema.sql").split(";") {
530            sqlx::query(statement).execute(dbtx.as_mut()).await?;
531        }
532
533        // decode the initial supply from the genesis
534        // initial app state is not recomputed from events, because events are not emitted in init_chain.
535        // instead, the indexer directly parses the genesis.
536        add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
537            .await?;
538        Ok(())
539    }
540
541    fn name(&self) -> String {
542        "insights".to_string()
543    }
544
545    async fn index_batch(
546        &self,
547        dbtx: &mut PgTransaction,
548        batch: EventBatch,
549        _ctx: EventBatchContext,
550    ) -> Result<(), anyhow::Error> {
551        for event in batch.events() {
552            self.index_event(dbtx, event).await?;
553        }
554        Ok(())
555    }
556}