pindexer/insights/
mod.rs

1use ethnum::I256;
2use std::{collections::BTreeMap, iter};
3
4use cometindex::{
5    async_trait,
6    index::{EventBatch, EventBatchContext},
7    AppView, ContextualizedEvent, PgTransaction,
8};
9use penumbra_sdk_app::genesis::Content;
10use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
11use penumbra_sdk_dex::{
12    event::{EventArbExecution, EventCandlestickData},
13    DirectedTradingPair,
14};
15use penumbra_sdk_fee::event::EventBlockFees;
16use penumbra_sdk_funding::event::EventFundingStreamReward;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_shielded_pool::event::{
20    EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund,
21    EventOutboundFungibleTokenTransfer,
22};
23use penumbra_sdk_stake::{
24    event::{EventDelegate, EventRateDataChange, EventUndelegate},
25    validator::Validator,
26    IdentityKey,
27};
28
29use crate::parsing::parse_content;
30
31#[derive(Debug, Clone, Copy)]
32struct ValidatorSupply {
33    um: u64,
34    rate_bps2: u64,
35}
36
37async fn modify_validator_supply(
38    dbtx: &mut PgTransaction<'_>,
39    height: u64,
40    ik: IdentityKey,
41    f: Box<dyn FnOnce(ValidatorSupply) -> anyhow::Result<ValidatorSupply> + Send + 'static>,
42) -> anyhow::Result<i64> {
43    let ik_text = ik.to_string();
44    let supply = {
45        let row: Option<(i64, i64)> = sqlx::query_as("
46            SELECT um, rate_bps2 FROM _insights_validators WHERE validator_id = $1 ORDER BY height DESC LIMIT 1
47        ").bind(&ik_text).fetch_optional(dbtx.as_mut()).await?;
48        let row = row.unwrap_or((0i64, 1_0000_0000i64));
49        ValidatorSupply {
50            um: u64::try_from(row.0)?,
51            rate_bps2: u64::try_from(row.1)?,
52        }
53    };
54    let new_supply = f(supply)?;
55    sqlx::query(
56        r#"
57        INSERT INTO _insights_validators 
58        VALUES ($1, $2, $3, $4)
59        ON CONFLICT (validator_id, height) DO UPDATE SET
60            um = excluded.um,
61            rate_bps2 = excluded.rate_bps2
62    "#,
63    )
64    .bind(&ik_text)
65    .bind(i64::try_from(height)?)
66    .bind(i64::try_from(new_supply.um)?)
67    .bind(i64::try_from(new_supply.rate_bps2)?)
68    .execute(dbtx.as_mut())
69    .await?;
70    Ok(i64::try_from(new_supply.um)? - i64::try_from(supply.um)?)
71}
72
73#[derive(Default, Debug, Clone, Copy)]
74struct Supply {
75    total: u64,
76    staked: u64,
77    price: Option<f64>,
78}
79
80async fn modify_supply(
81    dbtx: &mut PgTransaction<'_>,
82    height: u64,
83    price_numeraire: Option<asset::Id>,
84    f: Box<dyn FnOnce(Supply) -> anyhow::Result<Supply> + Send + 'static>,
85) -> anyhow::Result<()> {
86    let supply: Supply = {
87        let row: Option<(i64, i64, Option<f64>)> = sqlx::query_as(
88            "SELECT total, staked, price FROM insights_supply ORDER BY HEIGHT DESC LIMIT 1",
89        )
90        .fetch_optional(dbtx.as_mut())
91        .await?;
92        row.map(|(total, staked, price)| {
93            anyhow::Result::<_>::Ok(Supply {
94                total: total.try_into()?,
95                staked: staked.try_into()?,
96                price,
97            })
98        })
99        .transpose()?
100        .unwrap_or_default()
101    };
102    let supply = f(supply)?;
103    sqlx::query(
104        r#"
105        INSERT INTO 
106            insights_supply(height, total, staked, price, price_numeraire_asset_id) 
107            VALUES ($1, $2, $3, $5, $4)
108        ON CONFLICT (height) DO UPDATE SET
109        total = excluded.total,
110        staked = excluded.staked,
111        price = excluded.price,
112        price_numeraire_asset_id = excluded.price_numeraire_asset_id
113    "#,
114    )
115    .bind(i64::try_from(height)?)
116    .bind(i64::try_from(supply.total)?)
117    .bind(i64::try_from(supply.staked)?)
118    .bind(price_numeraire.map(|x| x.to_bytes()))
119    .bind(supply.price)
120    .execute(dbtx.as_mut())
121    .await?;
122    Ok(())
123}
124
125#[derive(Debug, Clone, Copy, PartialEq)]
126enum DepositorExisted {
127    Yes,
128    No,
129}
130
131async fn register_depositor(
132    dbtx: &mut PgTransaction<'_>,
133    asset_id: asset::Id,
134    address: &str,
135) -> anyhow::Result<DepositorExisted> {
136    let exists: bool = sqlx::query_scalar(
137        "SELECT EXISTS (SELECT 1 FROM _insights_shielded_pool_depositors WHERE asset_id = $1 AND address = $2)",
138    )
139    .bind(asset_id.to_bytes())
140    .bind(address)
141    .fetch_one(dbtx.as_mut())
142    .await?;
143    if exists {
144        return Ok(DepositorExisted::Yes);
145    }
146    sqlx::query("INSERT INTO _insights_shielded_pool_depositors VALUES ($1, $2)")
147        .bind(asset_id.to_bytes())
148        .bind(address)
149        .execute(dbtx.as_mut())
150        .await?;
151    Ok(DepositorExisted::No)
152}
153
154async fn asset_flow(
155    dbtx: &mut PgTransaction<'_>,
156    asset_id: asset::Id,
157    height: u64,
158    flow: I256,
159    refund: bool,
160    depositor_existed: DepositorExisted,
161) -> anyhow::Result<()> {
162    let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?;
163    let mut asset_pool = asset_pool
164        .map(|(t, c, u)| {
165            anyhow::Result::<(I256, I256, i32)>::Ok((
166                I256::from_str_radix(&t, 10)?,
167                I256::from_str_radix(&c, 10)?,
168                u,
169            ))
170        })
171        .transpose()?
172        .unwrap_or((I256::ZERO, I256::ZERO, 0i32));
173    asset_pool.0 += if refund {
174        I256::ZERO
175    } else {
176        flow.max(I256::ZERO)
177    };
178    asset_pool.1 += flow;
179    asset_pool.2 += match depositor_existed {
180        DepositorExisted::Yes => 0,
181        DepositorExisted::No => 1,
182    };
183    sqlx::query(
184        r#"
185        INSERT INTO insights_shielded_pool
186        VALUES ($1, $2, $3, $4, $5)
187        ON CONFLICT (asset_id, height) DO UPDATE SET
188        total_value = excluded.total_value,
189        current_value = excluded.current_value,
190        unique_depositors = excluded.unique_depositors
191    "#,
192    )
193    .bind(asset_id.to_bytes())
194    .bind(i64::try_from(height)?)
195    .bind(asset_pool.0.to_string())
196    .bind(asset_pool.1.to_string())
197    .bind(asset_pool.2)
198    .execute(dbtx.as_mut())
199    .await?;
200    Ok(())
201}
202
203#[derive(Debug)]
204pub struct Component {
205    price_numeraire: Option<asset::Id>,
206}
207
208impl Component {
209    /// This component depends on a reference asset for the total supply price.
210    pub fn new(price_numeraire: Option<asset::Id>) -> Self {
211        Self { price_numeraire }
212    }
213}
214
215/// Add the initial native token supply.
216async fn add_genesis_native_token_allocation_supply<'a>(
217    dbtx: &mut PgTransaction<'a>,
218    content: &Content,
219) -> anyhow::Result<()> {
220    fn content_mints(content: &Content) -> BTreeMap<asset::Id, Amount> {
221        let community_pool_mint = iter::once((
222            *STAKING_TOKEN_ASSET_ID,
223            content.community_pool_content.initial_balance.amount,
224        ));
225        let allocation_mints = content
226            .shielded_pool_content
227            .allocations
228            .iter()
229            .map(|allocation| {
230                let value = allocation.value();
231                (value.asset_id, value.amount)
232            });
233
234        let mut out = BTreeMap::new();
235        for (id, amount) in community_pool_mint.chain(allocation_mints) {
236            out.entry(id).and_modify(|x| *x += amount).or_insert(amount);
237        }
238        out
239    }
240
241    let mints = content_mints(content);
242
243    let unstaked = u64::try_from(
244        mints
245            .get(&*STAKING_TOKEN_ASSET_ID)
246            .copied()
247            .unwrap_or_default()
248            .value(),
249    )?;
250
251    let mut staked = 0u64;
252    // at genesis, assume a 1:1 ratio between delegation amount and native token amount.
253    for val in &content.stake_content.validators {
254        let val = Validator::try_from(val.clone())?;
255        let delegation_amount: u64 = mints
256            .get(&val.token().id())
257            .cloned()
258            .unwrap_or_default()
259            .value()
260            .try_into()?;
261        staked += delegation_amount;
262        modify_validator_supply(
263            dbtx,
264            0,
265            val.identity_key,
266            Box::new(move |_| {
267                Ok(ValidatorSupply {
268                    um: delegation_amount,
269                    rate_bps2: 1_0000_0000,
270                })
271            }),
272        )
273        .await?;
274    }
275
276    modify_supply(
277        dbtx,
278        0,
279        None,
280        Box::new(move |_| {
281            Ok(Supply {
282                total: unstaked + staked,
283                staked,
284                price: None,
285            })
286        }),
287    )
288    .await?;
289
290    Ok(())
291}
292
293impl Component {
294    async fn index_event(
295        &self,
296        dbtx: &mut PgTransaction<'_>,
297        event: ContextualizedEvent<'_>,
298    ) -> Result<(), anyhow::Error> {
299        let height = event.block_height;
300        if let Ok(e) = EventUndelegate::try_from_event(&event.event) {
301            let delta = modify_validator_supply(
302                dbtx,
303                height,
304                e.identity_key,
305                Box::new(move |supply| {
306                    Ok(ValidatorSupply {
307                        um: supply.um + u64::try_from(e.amount.value()).expect(""),
308                        ..supply
309                    })
310                }),
311            )
312            .await?;
313            modify_supply(
314                dbtx,
315                height,
316                self.price_numeraire,
317                Box::new(move |supply| {
318                    // The amount staked has changed, but no inflation has happened.
319                    Ok(Supply {
320                        staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
321                        ..supply
322                    })
323                }),
324            )
325            .await?;
326        } else if let Ok(e) = EventDelegate::try_from_event(&event.event) {
327            let delta = modify_validator_supply(
328                dbtx,
329                height,
330                e.identity_key,
331                Box::new(move |supply| {
332                    Ok(ValidatorSupply {
333                        um: supply.um + u64::try_from(e.amount.value()).expect(""),
334                        ..supply
335                    })
336                }),
337            )
338            .await?;
339            modify_supply(
340                dbtx,
341                height,
342                self.price_numeraire,
343                Box::new(move |supply| {
344                    Ok(Supply {
345                        staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
346                        ..supply
347                    })
348                }),
349            )
350            .await?;
351        } else if let Ok(e) = EventRateDataChange::try_from_event(&event.event) {
352            let delta = modify_validator_supply(
353                dbtx,
354                height,
355                e.identity_key,
356                Box::new(move |supply| {
357                    // del_um <- um / old_exchange_rate
358                    // um <- del_um * new_exchange_rate
359                    // so
360                    // um <- um * (new_exchange_rate / old_exchange_rate)
361                    // and the bps cancel out.
362                    let um = (u128::from(supply.um) * e.rate_data.validator_exchange_rate.value())
363                        .checked_div(supply.rate_bps2.into())
364                        .unwrap_or(0u128)
365                        .try_into()?;
366                    Ok(ValidatorSupply {
367                        um,
368                        rate_bps2: u64::try_from(e.rate_data.validator_exchange_rate.value())?,
369                    })
370                }),
371            )
372            .await?;
373            modify_supply(
374                dbtx,
375                height,
376                self.price_numeraire,
377                Box::new(move |supply| {
378                    // Value has been created or destroyed!
379                    Ok(Supply {
380                        total: u64::try_from(i64::try_from(supply.total)? + delta)?,
381                        staked: u64::try_from(i64::try_from(supply.staked)? + delta)?,
382                        ..supply
383                    })
384                }),
385            )
386            .await?;
387        } else if let Ok(e) = EventBlockFees::try_from_event(&event.event) {
388            let value = e.swapped_fee_total.value();
389            if value.asset_id == *STAKING_TOKEN_ASSET_ID {
390                let amount = u64::try_from(value.amount.value())?;
391                // We consider the tip to be destroyed too, matching the current logic
392                // DRAGON: if this changes, this code should use the base fee only.
393                modify_supply(
394                    dbtx,
395                    height,
396                    self.price_numeraire,
397                    Box::new(move |supply| {
398                        Ok(Supply {
399                            total: supply.total - amount,
400                            ..supply
401                        })
402                    }),
403                )
404                .await?;
405            }
406        } else if let Ok(e) = EventArbExecution::try_from_event(&event.event) {
407            let input = e.swap_execution.input;
408            let output = e.swap_execution.output;
409            if input.asset_id == *STAKING_TOKEN_ASSET_ID
410                && output.asset_id == *STAKING_TOKEN_ASSET_ID
411            {
412                let profit = u64::try_from((output.amount - input.amount).value())?;
413                modify_supply(
414                    dbtx,
415                    height,
416                    self.price_numeraire,
417                    Box::new(move |supply| {
418                        Ok(Supply {
419                            total: supply.total - profit,
420                            ..supply
421                        })
422                    }),
423                )
424                .await?;
425            }
426        } else if let Ok(e) = EventFundingStreamReward::try_from_event(&event.event) {
427            let amount = u64::try_from(e.reward_amount.value())?;
428            modify_supply(
429                dbtx,
430                height,
431                self.price_numeraire,
432                Box::new(move |supply| {
433                    Ok(Supply {
434                        total: supply.total + amount,
435                        ..supply
436                    })
437                }),
438            )
439            .await?;
440        } else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) {
441            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
442                let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?;
443                let flow = I256::try_from(e.value.amount.value())?;
444                asset_flow(dbtx, e.value.asset_id, height, flow, false, existed).await?;
445            }
446        } else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) {
447            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
448                let flow = I256::try_from(e.value.amount.value())?;
449                // For outbound transfers, never increment unique count
450                asset_flow(
451                    dbtx,
452                    e.value.asset_id,
453                    height,
454                    -flow,
455                    false,
456                    DepositorExisted::No,
457                )
458                .await?;
459            }
460        } else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) {
461            if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
462                let flow = I256::try_from(e.value.amount.value())?;
463                // For outbound transfers, never increment unique count.
464                asset_flow(
465                    dbtx,
466                    e.value.asset_id,
467                    height,
468                    flow,
469                    true,
470                    DepositorExisted::No,
471                )
472                .await?;
473            }
474        } else if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
475            if let Some(pn) = self.price_numeraire {
476                if e.pair == DirectedTradingPair::new(*STAKING_TOKEN_ASSET_ID, pn) {
477                    let price = e.stick.close;
478                    modify_supply(
479                        dbtx,
480                        height,
481                        self.price_numeraire,
482                        Box::new(move |supply| {
483                            Ok(Supply {
484                                price: Some(price),
485                                ..supply
486                            })
487                        }),
488                    )
489                    .await?;
490                }
491            }
492        }
493        tracing::debug!(?event, "unrecognized event");
494        Ok(())
495    }
496}
497
498#[async_trait]
499impl AppView for Component {
500    async fn init_chain(
501        &self,
502        dbtx: &mut PgTransaction,
503        app_state: &serde_json::Value,
504    ) -> Result<(), anyhow::Error> {
505        for statement in include_str!("schema.sql").split(";") {
506            sqlx::query(statement).execute(dbtx.as_mut()).await?;
507        }
508
509        // decode the initial supply from the genesis
510        // initial app state is not recomputed from events, because events are not emitted in init_chain.
511        // instead, the indexer directly parses the genesis.
512        add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?)
513            .await?;
514        Ok(())
515    }
516
517    fn name(&self) -> String {
518        "insights".to_string()
519    }
520
521    async fn index_batch(
522        &self,
523        dbtx: &mut PgTransaction,
524        batch: EventBatch,
525        _ctx: EventBatchContext,
526    ) -> Result<(), anyhow::Error> {
527        for event in batch.events() {
528            self.index_event(dbtx, event).await?;
529        }
530        Ok(())
531    }
532}