pindexer/lqt/
mod.rs

1use anyhow::anyhow;
2use cometindex::{
3    async_trait,
4    index::{EventBatch, EventBatchContext, Version},
5    sqlx, AppView, ContextualizedEvent, PgTransaction,
6};
7use penumbra_sdk_app::{event::EventAppParametersChange, genesis::Content, params::AppParameters};
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_dex::event::EventLqtPositionVolume;
10use penumbra_sdk_dex::lp::position;
11use penumbra_sdk_distributions::{event::EventLqtPoolSizeIncrease, DistributionsParameters};
12use penumbra_sdk_funding::{
13    event::{EventLqtDelegatorReward, EventLqtPositionReward, EventLqtVote},
14    FundingParameters,
15};
16use penumbra_sdk_keys::Address;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_sct::{event::EventEpochRoot, params::SctParameters};
20use sqlx::types::BigDecimal;
21
22use crate::parsing::parse_content;
23
24struct Parameters {
25    funding: FundingParameters,
26    sct: SctParameters,
27    distribution: DistributionsParameters,
28}
29
30impl From<Content> for Parameters {
31    fn from(value: Content) -> Self {
32        Self {
33            funding: value.funding_content.funding_params,
34            sct: value.sct_content.sct_params,
35            distribution: value.distributions_content.distributions_params,
36        }
37    }
38}
39
40impl From<AppParameters> for Parameters {
41    fn from(value: AppParameters) -> Self {
42        Self {
43            funding: value.funding_params,
44            sct: value.sct_params,
45            distribution: value.distributions_params,
46        }
47    }
48}
49
50mod _params {
51    use super::*;
52
53    /// Set the params post genesis.
54    pub async fn set_initial(
55        dbtx: &mut PgTransaction<'_>,
56        params: Parameters,
57    ) -> anyhow::Result<()> {
58        set_epoch(dbtx, 0, params).await
59    }
60
61    // This will be used once we integrate the event for parameter changes.
62    #[allow(dead_code)]
63    /// Set the params for a given epoch.
64    pub async fn set_epoch(
65        dbtx: &mut PgTransaction<'_>,
66        epoch: u64,
67        params: Parameters,
68    ) -> anyhow::Result<()> {
69        let gauge_threshold = params.funding.liquidity_tournament.gauge_threshold;
70        let delegator_share = params.funding.liquidity_tournament.delegator_share;
71        let epoch_duration = params.sct.epoch_duration;
72        let rewards_per_block = params.distribution.liquidity_tournament_incentive_per_block;
73        sqlx::query(
74            "
75            INSERT INTO lqt._params
76            VALUES ($1, $2::NUMERIC / 100, $3::NUMERIC / 100, $4, $5)
77            ON CONFLICT (epoch)
78            DO UPDATE SET
79                delegator_share = EXCLUDED.delegator_share,
80                gauge_threshold = EXCLUDED.gauge_threshold,
81                epoch_duration = EXCLUDED.epoch_duration,
82                rewards_per_block = EXCLUDED.rewards_per_block
83        ",
84        )
85        .bind(i64::try_from(epoch)?)
86        .bind(i64::try_from(delegator_share.to_percent())?)
87        .bind(i64::try_from(gauge_threshold.to_percent())?)
88        .bind(i64::try_from(epoch_duration)?)
89        .bind(i64::try_from(rewards_per_block)?)
90        .execute(dbtx.as_mut())
91        .await?;
92        Ok(())
93    }
94}
95
96mod _meta {
97    use super::*;
98
99    pub async fn update_with_batch(
100        dbtx: &mut PgTransaction<'_>,
101        batch: &EventBatch,
102        block_time_s: f64,
103    ) -> anyhow::Result<()> {
104        let current_height = batch
105            .events_by_block()
106            .last()
107            .map(|x| x.height())
108            .ok_or(anyhow!("expected there to be at least one block in events"))?;
109        sqlx::query(
110            "
111            INSERT INTO lqt._meta VALUES (0, $1, $2)
112            ON CONFLICT (rowid)
113            DO UPDATE SET
114                current_height = EXCLUDED.current_height,
115                block_time_s = EXCLUDED.block_time_s
116        ",
117        )
118        .bind(i64::try_from(current_height)?)
119        .bind(block_time_s)
120        .execute(dbtx.as_mut())
121        .await?;
122        Ok(())
123    }
124}
125
126mod _epoch_info {
127    use super::*;
128
129    pub async fn start_epoch(
130        dbtx: &mut PgTransaction<'_>,
131        epoch: u64,
132        height: u64,
133    ) -> anyhow::Result<()> {
134        sqlx::query(
135            "INSERT INTO lqt._epoch_info VALUES ($1, $2, $2, NULL, 0) ON CONFLICT DO NOTHING",
136        )
137        .bind(i64::try_from(epoch)?)
138        .bind(i64::try_from(height)?)
139        .execute(dbtx.as_mut())
140        .await?;
141        Ok(())
142    }
143
144    pub async fn end_epoch(
145        dbtx: &mut PgTransaction<'_>,
146        epoch: u64,
147        height: u64,
148    ) -> anyhow::Result<()> {
149        sqlx::query(
150            "UPDATE lqt._epoch_info SET end_block = $2, updated_block = $2 WHERE epoch = $1",
151        )
152        .bind(i64::try_from(epoch)?)
153        .bind(i64::try_from(height)?)
154        .execute(dbtx.as_mut())
155        .await?;
156        Ok(())
157    }
158
159    pub async fn set_rewards_for_epoch(
160        dbtx: &mut PgTransaction<'_>,
161        epoch: u64,
162        height: u64,
163        amount: Amount,
164    ) -> anyhow::Result<()> {
165        sqlx::query("UPDATE lqt._epoch_info SET available_rewards = $2::NUMERIC, updated_block = $3 WHERE epoch = $1")
166            .bind(i64::try_from(epoch)?)
167            .bind(BigDecimal::from(amount.value()))
168            .bind(i64::try_from(height)?)
169            .execute(dbtx.as_mut())
170            .await?;
171        Ok(())
172    }
173
174    pub async fn current(dbtx: &mut PgTransaction<'_>) -> anyhow::Result<u64> {
175        let out: i32 =
176            sqlx::query_scalar("SELECT epoch FROM lqt._epoch_info ORDER BY epoch DESC LIMIT 1")
177                .fetch_one(dbtx.as_mut())
178                .await?;
179        Ok(u64::try_from(out)?)
180    }
181}
182
183mod _finished_epochs {
184    use super::*;
185
186    /// Declare that a particular epoch has ended.
187    ///
188    /// This is idempotent.
189    pub async fn declare_finished(dbtx: &mut PgTransaction<'_>, epoch: u64) -> anyhow::Result<()> {
190        sqlx::query("INSERT INTO lqt._finished_epochs VALUES ($1) ON CONFLICT DO NOTHING")
191            .bind(i64::try_from(epoch)?)
192            .execute(dbtx.as_mut())
193            .await?;
194        Ok(())
195    }
196}
197
198mod _delegator_rewards {
199    use super::*;
200
201    /// Add a reward to a particular delegator in some epoch.
202    pub async fn add(
203        dbtx: &mut PgTransaction<'_>,
204        epoch: u64,
205        addr: Address,
206        amount: Amount,
207    ) -> anyhow::Result<()> {
208        sqlx::query(
209            "
210            INSERT INTO lqt._delegator_rewards      
211            VALUES ($1, $2, $3)
212            ON CONFLICT (epoch, address)
213            DO UPDATE SET
214                amount = lqt._delegator_rewards.amount + EXCLUDED.amount
215        ",
216        )
217        .bind(i64::try_from(epoch)?)
218        .bind(addr.to_vec())
219        .bind(BigDecimal::from(amount.value()))
220        .execute(dbtx.as_mut())
221        .await?;
222        Ok(())
223    }
224}
225
226mod _lp_rewards {
227    use super::*;
228
229    /// Add a reward to a given LP in some epoch.
230    ///
231    /// This assumes that the LP has already been created.
232    pub async fn add_reward(
233        dbtx: &mut PgTransaction<'_>,
234        epoch: u64,
235        position_id: position::Id,
236        amount: Amount,
237    ) -> anyhow::Result<()> {
238        sqlx::query(
239            "
240            UPDATE lqt._lp_rewards       
241            SET amount = amount + $3
242            WHERE epoch = $1 AND position_id = $2
243        ",
244        )
245        .bind(i64::try_from(epoch)?)
246        .bind(position_id.0)
247        .bind(BigDecimal::from(amount.value()))
248        .execute(dbtx.as_mut())
249        .await?;
250        Ok(())
251    }
252
253    /// Add flows to some LP's state.
254    pub async fn add_flows(
255        dbtx: &mut PgTransaction<'_>,
256        epoch: u64,
257        position_id: position::Id,
258        other_asset: asset::Id,
259        um_volume: Amount,
260        asset_volume: Amount,
261        um_fees: Amount,
262        asset_fees: Amount,
263        points: Amount,
264    ) -> anyhow::Result<()> {
265        sqlx::query(
266            "
267            INSERT INTO lqt._lp_rewards      
268            VALUES ($1, $2, $3, 0, 1, $4, $5, $6, $7, $8)
269            ON CONFLICT (epoch, position_id)
270            DO UPDATE SET
271                asset_id = EXCLUDED.asset_id,
272                executions = lqt._lp_rewards.executions + 1,
273                um_volume = lqt._lp_rewards.um_volume + EXCLUDED.um_volume,
274                asset_volume = lqt._lp_rewards.asset_volume + EXCLUDED.asset_volume,
275                um_fees = lqt._lp_rewards.um_fees + EXCLUDED.um_fees,
276                asset_fees = lqt._lp_rewards.asset_fees + EXCLUDED.asset_fees,
277                points = lqt._lp_rewards.points + EXCLUDED.points
278        ",
279        )
280        .bind(i64::try_from(epoch)?)
281        .bind(position_id.0)
282        .bind(other_asset.to_bytes())
283        .bind(BigDecimal::from(um_volume.value()))
284        .bind(BigDecimal::from(asset_volume.value()))
285        .bind(BigDecimal::from(um_fees.value()))
286        .bind(BigDecimal::from(asset_fees.value()))
287        .bind(BigDecimal::from(points.value()))
288        .execute(dbtx.as_mut())
289        .await?;
290        Ok(())
291    }
292}
293
294mod _votes {
295    use super::*;
296
297    /// Add a vote in a given epoch.
298    pub async fn add(
299        dbtx: &mut PgTransaction<'_>,
300        epoch: u64,
301        power: Amount,
302        asset_id: asset::Id,
303        addr: Address,
304    ) -> anyhow::Result<()> {
305        sqlx::query(
306            "
307            INSERT INTO lqt._votes
308            VALUES (DEFAULT, $1, $2, $3, $4)      
309        ",
310        )
311        .bind(i64::try_from(epoch)?)
312        .bind(BigDecimal::from(power.value()))
313        .bind(asset_id.to_bytes())
314        .bind(addr.to_vec())
315        .execute(dbtx.as_mut())
316        .await?;
317        Ok(())
318    }
319}
320
321#[derive(Debug)]
322pub struct Lqt {
323    block_time_s: f64,
324}
325
326impl Lqt {
327    pub fn new(block_time_s: f64) -> Self {
328        Self { block_time_s }
329    }
330
331    async fn index_event(
332        &self,
333        dbtx: &mut PgTransaction<'_>,
334        event: ContextualizedEvent<'_>,
335    ) -> anyhow::Result<()> {
336        if let Ok(e) = EventLqtVote::try_from_event(&event.event) {
337            _votes::add(
338                dbtx,
339                e.epoch_index,
340                e.voting_power,
341                e.incentivized_asset_id,
342                e.rewards_recipient,
343            )
344            .await?;
345        } else if let Ok(e) = EventLqtDelegatorReward::try_from_event(&event.event) {
346            _delegator_rewards::add(dbtx, e.epoch_index, e.address, e.reward_amount).await?;
347        } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
348            _lp_rewards::add_reward(dbtx, e.epoch_index, e.position_id, e.reward_amount).await?;
349        } else if let Ok(e) = EventLqtPositionVolume::try_from_event(&event.event) {
350            _lp_rewards::add_flows(
351                dbtx,
352                e.epoch_index,
353                e.position_id,
354                e.asset_id,
355                e.staking_token_in,
356                e.asset_in,
357                e.staking_fees,
358                e.asset_fees,
359                e.volume,
360            )
361            .await?;
362        } else if let Ok(e) = EventEpochRoot::try_from_event(&event.event) {
363            _finished_epochs::declare_finished(dbtx, e.index).await?;
364            _epoch_info::end_epoch(dbtx, e.index, event.block_height).await?;
365            _epoch_info::start_epoch(dbtx, e.index + 1, event.block_height + 1).await?;
366        } else if let Ok(e) = EventLqtPoolSizeIncrease::try_from_event(&event.event) {
367            _epoch_info::set_rewards_for_epoch(
368                dbtx,
369                e.epoch_index,
370                event.block_height,
371                e.new_total,
372            )
373            .await?;
374        } else if let Ok(e) = EventAppParametersChange::try_from_event(&event.event) {
375            let current = _epoch_info::current(dbtx).await?;
376            _params::set_epoch(dbtx, current, e.new_parameters.into()).await?;
377        }
378        Ok(())
379    }
380}
381
382#[async_trait]
383impl AppView for Lqt {
384    async fn init_chain(
385        &self,
386        dbtx: &mut PgTransaction,
387        app_state: &serde_json::Value,
388    ) -> Result<(), anyhow::Error> {
389        let content = parse_content(app_state.clone())?;
390        _params::set_initial(dbtx, content.into()).await?;
391        Ok(())
392    }
393
394    fn name(&self) -> String {
395        "lqt".to_string()
396    }
397
398    fn version(&self) -> Version {
399        let hash: [u8; 32] = blake2b_simd::Params::default()
400            .personal(b"option_hash")
401            .hash_length(32)
402            .to_state()
403            .update(&self.block_time_s.to_le_bytes())
404            .finalize()
405            .as_bytes()
406            .try_into()
407            .expect("Impossible 000-003: expected 32 byte hash");
408
409        Version::with_major(4).with_option_hash(hash)
410    }
411
412    async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
413        for statement in include_str!("reset.sql").split(";") {
414            sqlx::query(statement).execute(dbtx.as_mut()).await?;
415        }
416        Ok(())
417    }
418
419    async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
420        for statement in include_str!("schema.sql").split(";") {
421            sqlx::query(statement).execute(dbtx.as_mut()).await?;
422        }
423        _epoch_info::start_epoch(dbtx, 1, 1).await?;
424        Ok(())
425    }
426
427    async fn index_batch(
428        &self,
429        dbtx: &mut PgTransaction,
430        batch: EventBatch,
431        _ctx: EventBatchContext,
432    ) -> Result<(), anyhow::Error> {
433        _meta::update_with_batch(dbtx, &batch, self.block_time_s).await?;
434        for event in batch.events() {
435            self.index_event(dbtx, event).await?;
436        }
437        Ok(())
438    }
439}