pindexer/lqt/
mod.rs

1use anyhow::anyhow;
2use cometindex::{
3    async_trait,
4    index::{EventBatch, EventBatchContext, Version},
5    sqlx, AppView, ContextualizedEvent, PgTransaction,
6};
7use penumbra_sdk_app::{event::EventAppParametersChange, genesis::Content, params::AppParameters};
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_dex::event::EventLqtPositionVolume;
10use penumbra_sdk_dex::lp::position;
11use penumbra_sdk_distributions::{event::EventLqtPoolSizeIncrease, DistributionsParameters};
12use penumbra_sdk_funding::{
13    event::{EventLqtDelegatorReward, EventLqtPositionReward, EventLqtVote},
14    FundingParameters,
15};
16use penumbra_sdk_keys::Address;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_sct::{event::EventEpochRoot, params::SctParameters};
20use sqlx::types::BigDecimal;
21
22use crate::parsing::parse_content;
23
24struct Parameters {
25    funding: FundingParameters,
26    sct: SctParameters,
27    distribution: DistributionsParameters,
28}
29
30impl From<Content> for Parameters {
31    fn from(value: Content) -> Self {
32        Self {
33            funding: value.funding_content.funding_params,
34            sct: value.sct_content.sct_params,
35            distribution: value.distributions_content.distributions_params,
36        }
37    }
38}
39
40impl From<AppParameters> for Parameters {
41    fn from(value: AppParameters) -> Self {
42        Self {
43            funding: value.funding_params,
44            sct: value.sct_params,
45            distribution: value.distributions_params,
46        }
47    }
48}
49
50mod _params {
51    use super::*;
52
53    /// Set the params post genesis.
54    pub async fn set_initial(
55        dbtx: &mut PgTransaction<'_>,
56        params: Parameters,
57    ) -> anyhow::Result<()> {
58        set_epoch(dbtx, 0, params).await
59    }
60
61    // This will be used once we integrate the event for parameter changes.
62    #[allow(dead_code)]
63    /// Set the params for a given epoch.
64    pub async fn set_epoch(
65        dbtx: &mut PgTransaction<'_>,
66        epoch: u64,
67        params: Parameters,
68    ) -> anyhow::Result<()> {
69        let gauge_threshold = params.funding.liquidity_tournament.gauge_threshold;
70        let delegator_share = params.funding.liquidity_tournament.delegator_share;
71        let epoch_duration = params.sct.epoch_duration;
72        let rewards_per_block = params.distribution.liquidity_tournament_incentive_per_block;
73        sqlx::query(
74            "
75            INSERT INTO lqt._params
76            VALUES ($1, $2::NUMERIC / 100, $3::NUMERIC / 100, $4, $5)
77            ON CONFLICT (epoch)
78            DO UPDATE SET
79                delegator_share = EXCLUDED.delegator_share,
80                gauge_threshold = EXCLUDED.gauge_threshold,
81                epoch_duration = EXCLUDED.epoch_duration,
82                rewards_per_block = EXCLUDED.rewards_per_block
83        ",
84        )
85        .bind(i64::try_from(epoch)?)
86        .bind(i64::try_from(delegator_share.to_percent())?)
87        .bind(i64::try_from(gauge_threshold.to_percent())?)
88        .bind(i64::try_from(epoch_duration)?)
89        .bind(i64::try_from(rewards_per_block)?)
90        .execute(dbtx.as_mut())
91        .await?;
92        Ok(())
93    }
94}
95
96mod _meta {
97    use penumbra_sdk_sct::event::EventBlockRoot;
98
99    use super::*;
100
101    const DEFAULT_BLOCK_TIME_SECS: f64 = 5.0;
102
103    pub async fn update_with_batch(
104        dbtx: &mut PgTransaction<'_>,
105        batch: &EventBatch,
106    ) -> anyhow::Result<()> {
107        let block_time_s = {
108            let mut timestamps = batch.events().filter_map(|e| {
109                EventBlockRoot::try_from_event(e.event)
110                    .ok()
111                    .map(|x| (x.height, x.timestamp_seconds as f64))
112            });
113            let first = timestamps.next();
114            let last = timestamps.last();
115            match (first, last) {
116                (Some((h0, t0)), Some((h1, t1))) => (t1 - t0) / (h1 - h0) as f64,
117                _ => DEFAULT_BLOCK_TIME_SECS,
118            }
119        };
120        let current_height = batch
121            .events_by_block()
122            .last()
123            .map(|x| x.height())
124            .ok_or(anyhow!("expected there to be at least one block in events"))?;
125        sqlx::query(
126            "
127            INSERT INTO lqt._meta VALUES (0, $1, $2)
128            ON CONFLICT (rowid)
129            DO UPDATE SET
130                current_height = EXCLUDED.current_height,
131                block_time_s = EXCLUDED.block_time_s
132        ",
133        )
134        .bind(i64::try_from(current_height)?)
135        .bind(block_time_s)
136        .execute(dbtx.as_mut())
137        .await?;
138        Ok(())
139    }
140}
141
142mod _epoch_info {
143    use super::*;
144
145    pub async fn start_epoch(
146        dbtx: &mut PgTransaction<'_>,
147        epoch: u64,
148        height: u64,
149    ) -> anyhow::Result<()> {
150        sqlx::query(
151            "INSERT INTO lqt._epoch_info VALUES ($1, $2, $2, NULL, 0) ON CONFLICT DO NOTHING",
152        )
153        .bind(i64::try_from(epoch)?)
154        .bind(i64::try_from(height)?)
155        .execute(dbtx.as_mut())
156        .await?;
157        Ok(())
158    }
159
160    pub async fn end_epoch(
161        dbtx: &mut PgTransaction<'_>,
162        epoch: u64,
163        height: u64,
164    ) -> anyhow::Result<()> {
165        sqlx::query(
166            "UPDATE lqt._epoch_info SET end_block = $2, updated_block = $2 WHERE epoch = $1",
167        )
168        .bind(i64::try_from(epoch)?)
169        .bind(i64::try_from(height)?)
170        .execute(dbtx.as_mut())
171        .await?;
172        Ok(())
173    }
174
175    pub async fn set_rewards_for_epoch(
176        dbtx: &mut PgTransaction<'_>,
177        epoch: u64,
178        height: u64,
179        amount: Amount,
180    ) -> anyhow::Result<()> {
181        sqlx::query("UPDATE lqt._epoch_info SET available_rewards = $2::NUMERIC, updated_block = $3 WHERE epoch = $1")
182            .bind(i64::try_from(epoch)?)
183            .bind(BigDecimal::from(amount.value()))
184            .bind(i64::try_from(height)?)
185            .execute(dbtx.as_mut())
186            .await?;
187        Ok(())
188    }
189
190    pub async fn current(dbtx: &mut PgTransaction<'_>) -> anyhow::Result<u64> {
191        let out: i32 =
192            sqlx::query_scalar("SELECT epoch FROM lqt._epoch_info ORDER BY epoch DESC LIMIT 1")
193                .fetch_one(dbtx.as_mut())
194                .await?;
195        Ok(u64::try_from(out)?)
196    }
197}
198
199mod _finished_epochs {
200    use super::*;
201
202    /// Declare that a particular epoch has ended.
203    ///
204    /// This is idempotent.
205    pub async fn declare_finished(dbtx: &mut PgTransaction<'_>, epoch: u64) -> anyhow::Result<()> {
206        sqlx::query("INSERT INTO lqt._finished_epochs VALUES ($1) ON CONFLICT DO NOTHING")
207            .bind(i64::try_from(epoch)?)
208            .execute(dbtx.as_mut())
209            .await?;
210        Ok(())
211    }
212}
213
214mod _delegator_rewards {
215    use super::*;
216
217    /// Add a reward to a particular delegator in some epoch.
218    pub async fn add(
219        dbtx: &mut PgTransaction<'_>,
220        epoch: u64,
221        addr: Address,
222        amount: Amount,
223    ) -> anyhow::Result<()> {
224        sqlx::query(
225            "
226            INSERT INTO lqt._delegator_rewards      
227            VALUES ($1, $2, $3)
228            ON CONFLICT (epoch, address)
229            DO UPDATE SET
230                amount = lqt._delegator_rewards.amount + EXCLUDED.amount
231        ",
232        )
233        .bind(i64::try_from(epoch)?)
234        .bind(addr.to_vec())
235        .bind(BigDecimal::from(amount.value()))
236        .execute(dbtx.as_mut())
237        .await?;
238        Ok(())
239    }
240}
241
242mod _lp_rewards {
243    use super::*;
244
245    /// Add a reward to a given LP in some epoch.
246    ///
247    /// This assumes that the LP has already been created.
248    pub async fn add_reward(
249        dbtx: &mut PgTransaction<'_>,
250        epoch: u64,
251        position_id: position::Id,
252        amount: Amount,
253    ) -> anyhow::Result<()> {
254        sqlx::query(
255            "
256            UPDATE lqt._lp_rewards       
257            SET amount = amount + $3
258            WHERE epoch = $1 AND position_id = $2
259        ",
260        )
261        .bind(i64::try_from(epoch)?)
262        .bind(position_id.0)
263        .bind(BigDecimal::from(amount.value()))
264        .execute(dbtx.as_mut())
265        .await?;
266        Ok(())
267    }
268
269    /// Add flows to some LP's state.
270    pub async fn add_flows(
271        dbtx: &mut PgTransaction<'_>,
272        epoch: u64,
273        position_id: position::Id,
274        other_asset: asset::Id,
275        um_volume: Amount,
276        asset_volume: Amount,
277        um_fees: Amount,
278        asset_fees: Amount,
279        points: Amount,
280    ) -> anyhow::Result<()> {
281        sqlx::query(
282            "
283            INSERT INTO lqt._lp_rewards      
284            VALUES ($1, $2, $3, 0, 1, $4, $5, $6, $7, $8)
285            ON CONFLICT (epoch, position_id)
286            DO UPDATE SET
287                asset_id = EXCLUDED.asset_id,
288                executions = lqt._lp_rewards.executions + 1,
289                um_volume = lqt._lp_rewards.um_volume + EXCLUDED.um_volume,
290                asset_volume = lqt._lp_rewards.asset_volume + EXCLUDED.asset_volume,
291                um_fees = lqt._lp_rewards.um_fees + EXCLUDED.um_fees,
292                asset_fees = lqt._lp_rewards.asset_fees + EXCLUDED.asset_fees,
293                points = lqt._lp_rewards.points + EXCLUDED.points
294        ",
295        )
296        .bind(i64::try_from(epoch)?)
297        .bind(position_id.0)
298        .bind(other_asset.to_bytes())
299        .bind(BigDecimal::from(um_volume.value()))
300        .bind(BigDecimal::from(asset_volume.value()))
301        .bind(BigDecimal::from(um_fees.value()))
302        .bind(BigDecimal::from(asset_fees.value()))
303        .bind(BigDecimal::from(points.value()))
304        .execute(dbtx.as_mut())
305        .await?;
306        Ok(())
307    }
308}
309
310mod _votes {
311    use super::*;
312
313    /// Add a vote in a given epoch.
314    pub async fn add(
315        dbtx: &mut PgTransaction<'_>,
316        epoch: u64,
317        power: Amount,
318        asset_id: asset::Id,
319        addr: Address,
320    ) -> anyhow::Result<()> {
321        sqlx::query(
322            "
323            INSERT INTO lqt._votes
324            VALUES (DEFAULT, $1, $2, $3, $4)      
325        ",
326        )
327        .bind(i64::try_from(epoch)?)
328        .bind(BigDecimal::from(power.value()))
329        .bind(asset_id.to_bytes())
330        .bind(addr.to_vec())
331        .execute(dbtx.as_mut())
332        .await?;
333        Ok(())
334    }
335}
336
337#[derive(Debug)]
338pub struct Lqt {}
339
340impl Lqt {
341    async fn index_event(
342        &self,
343        dbtx: &mut PgTransaction<'_>,
344        event: ContextualizedEvent<'_>,
345    ) -> anyhow::Result<()> {
346        if let Ok(e) = EventLqtVote::try_from_event(&event.event) {
347            _votes::add(
348                dbtx,
349                e.epoch_index,
350                e.voting_power,
351                e.incentivized_asset_id,
352                e.rewards_recipient,
353            )
354            .await?;
355        } else if let Ok(e) = EventLqtDelegatorReward::try_from_event(&event.event) {
356            _delegator_rewards::add(dbtx, e.epoch_index, e.address, e.reward_amount).await?;
357        } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
358            _lp_rewards::add_reward(dbtx, e.epoch_index, e.position_id, e.reward_amount).await?;
359        } else if let Ok(e) = EventLqtPositionVolume::try_from_event(&event.event) {
360            _lp_rewards::add_flows(
361                dbtx,
362                e.epoch_index,
363                e.position_id,
364                e.asset_id,
365                e.staking_token_in,
366                e.asset_in,
367                e.staking_fees,
368                e.asset_fees,
369                e.volume,
370            )
371            .await?;
372        } else if let Ok(e) = EventEpochRoot::try_from_event(&event.event) {
373            _finished_epochs::declare_finished(dbtx, e.index).await?;
374            _epoch_info::end_epoch(dbtx, e.index, event.block_height).await?;
375            _epoch_info::start_epoch(dbtx, e.index + 1, event.block_height + 1).await?;
376        } else if let Ok(e) = EventLqtPoolSizeIncrease::try_from_event(&event.event) {
377            _epoch_info::set_rewards_for_epoch(
378                dbtx,
379                e.epoch_index,
380                event.block_height,
381                e.new_total,
382            )
383            .await?;
384        } else if let Ok(e) = EventAppParametersChange::try_from_event(&event.event) {
385            let current = _epoch_info::current(dbtx).await?;
386            _params::set_epoch(dbtx, current, e.new_parameters.into()).await?;
387        }
388        Ok(())
389    }
390}
391
392#[async_trait]
393impl AppView for Lqt {
394    async fn init_chain(
395        &self,
396        dbtx: &mut PgTransaction,
397        app_state: &serde_json::Value,
398    ) -> Result<(), anyhow::Error> {
399        let content = parse_content(app_state.clone())?;
400        _params::set_initial(dbtx, content.into()).await?;
401        Ok(())
402    }
403
404    fn name(&self) -> String {
405        "lqt".to_string()
406    }
407
408    fn version(&self) -> Version {
409        Version::with_major(4)
410    }
411
412    async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
413        for statement in include_str!("reset.sql").split(";") {
414            sqlx::query(statement).execute(dbtx.as_mut()).await?;
415        }
416        Ok(())
417    }
418
419    async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
420        for statement in include_str!("schema.sql").split(";") {
421            sqlx::query(statement).execute(dbtx.as_mut()).await?;
422        }
423        _epoch_info::start_epoch(dbtx, 1, 1).await?;
424        Ok(())
425    }
426
427    async fn index_batch(
428        &self,
429        dbtx: &mut PgTransaction,
430        batch: EventBatch,
431        _ctx: EventBatchContext,
432    ) -> Result<(), anyhow::Error> {
433        _meta::update_with_batch(dbtx, &batch).await?;
434        for event in batch.events() {
435            self.index_event(dbtx, event).await?;
436        }
437        Ok(())
438    }
439}