1use anyhow::anyhow;
2use cometindex::{
3 async_trait,
4 index::{EventBatch, EventBatchContext, Version},
5 sqlx, AppView, ContextualizedEvent, PgTransaction,
6};
7use penumbra_sdk_app::{event::EventAppParametersChange, genesis::Content, params::AppParameters};
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_dex::event::EventLqtPositionVolume;
10use penumbra_sdk_dex::lp::position;
11use penumbra_sdk_distributions::{event::EventLqtPoolSizeIncrease, DistributionsParameters};
12use penumbra_sdk_funding::{
13 event::{EventLqtDelegatorReward, EventLqtPositionReward, EventLqtVote},
14 FundingParameters,
15};
16use penumbra_sdk_keys::Address;
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::event::EventDomainType;
19use penumbra_sdk_sct::{event::EventEpochRoot, params::SctParameters};
20use sqlx::types::BigDecimal;
21
22use crate::parsing::parse_content;
23
24struct Parameters {
25 funding: FundingParameters,
26 sct: SctParameters,
27 distribution: DistributionsParameters,
28}
29
30impl From<Content> for Parameters {
31 fn from(value: Content) -> Self {
32 Self {
33 funding: value.funding_content.funding_params,
34 sct: value.sct_content.sct_params,
35 distribution: value.distributions_content.distributions_params,
36 }
37 }
38}
39
40impl From<AppParameters> for Parameters {
41 fn from(value: AppParameters) -> Self {
42 Self {
43 funding: value.funding_params,
44 sct: value.sct_params,
45 distribution: value.distributions_params,
46 }
47 }
48}
49
50mod _params {
51 use super::*;
52
53 pub async fn set_initial(
55 dbtx: &mut PgTransaction<'_>,
56 params: Parameters,
57 ) -> anyhow::Result<()> {
58 set_epoch(dbtx, 0, params).await
59 }
60
61 #[allow(dead_code)]
63 pub async fn set_epoch(
65 dbtx: &mut PgTransaction<'_>,
66 epoch: u64,
67 params: Parameters,
68 ) -> anyhow::Result<()> {
69 let gauge_threshold = params.funding.liquidity_tournament.gauge_threshold;
70 let delegator_share = params.funding.liquidity_tournament.delegator_share;
71 let epoch_duration = params.sct.epoch_duration;
72 let rewards_per_block = params.distribution.liquidity_tournament_incentive_per_block;
73 sqlx::query(
74 "
75 INSERT INTO lqt._params
76 VALUES ($1, $2::NUMERIC / 100, $3::NUMERIC / 100, $4, $5)
77 ON CONFLICT (epoch)
78 DO UPDATE SET
79 delegator_share = EXCLUDED.delegator_share,
80 gauge_threshold = EXCLUDED.gauge_threshold,
81 epoch_duration = EXCLUDED.epoch_duration,
82 rewards_per_block = EXCLUDED.rewards_per_block
83 ",
84 )
85 .bind(i64::try_from(epoch)?)
86 .bind(i64::try_from(delegator_share.to_percent())?)
87 .bind(i64::try_from(gauge_threshold.to_percent())?)
88 .bind(i64::try_from(epoch_duration)?)
89 .bind(i64::try_from(rewards_per_block)?)
90 .execute(dbtx.as_mut())
91 .await?;
92 Ok(())
93 }
94}
95
96mod _meta {
97 use penumbra_sdk_sct::event::EventBlockRoot;
98
99 use super::*;
100
101 const DEFAULT_BLOCK_TIME_SECS: f64 = 5.0;
102
103 pub async fn update_with_batch(
104 dbtx: &mut PgTransaction<'_>,
105 batch: &EventBatch,
106 ) -> anyhow::Result<()> {
107 let block_time_s = {
108 let mut timestamps = batch.events().filter_map(|e| {
109 EventBlockRoot::try_from_event(e.event)
110 .ok()
111 .map(|x| (x.height, x.timestamp_seconds as f64))
112 });
113 let first = timestamps.next();
114 let last = timestamps.last();
115 match (first, last) {
116 (Some((h0, t0)), Some((h1, t1))) => (t1 - t0) / (h1 - h0) as f64,
117 _ => DEFAULT_BLOCK_TIME_SECS,
118 }
119 };
120 let current_height = batch
121 .events_by_block()
122 .last()
123 .map(|x| x.height())
124 .ok_or(anyhow!("expected there to be at least one block in events"))?;
125 sqlx::query(
126 "
127 INSERT INTO lqt._meta VALUES (0, $1, $2)
128 ON CONFLICT (rowid)
129 DO UPDATE SET
130 current_height = EXCLUDED.current_height,
131 block_time_s = EXCLUDED.block_time_s
132 ",
133 )
134 .bind(i64::try_from(current_height)?)
135 .bind(block_time_s)
136 .execute(dbtx.as_mut())
137 .await?;
138 Ok(())
139 }
140}
141
142mod _epoch_info {
143 use super::*;
144
145 pub async fn start_epoch(
146 dbtx: &mut PgTransaction<'_>,
147 epoch: u64,
148 height: u64,
149 ) -> anyhow::Result<()> {
150 sqlx::query(
151 "INSERT INTO lqt._epoch_info VALUES ($1, $2, $2, NULL, 0) ON CONFLICT DO NOTHING",
152 )
153 .bind(i64::try_from(epoch)?)
154 .bind(i64::try_from(height)?)
155 .execute(dbtx.as_mut())
156 .await?;
157 Ok(())
158 }
159
160 pub async fn end_epoch(
161 dbtx: &mut PgTransaction<'_>,
162 epoch: u64,
163 height: u64,
164 ) -> anyhow::Result<()> {
165 sqlx::query(
166 "UPDATE lqt._epoch_info SET end_block = $2, updated_block = $2 WHERE epoch = $1",
167 )
168 .bind(i64::try_from(epoch)?)
169 .bind(i64::try_from(height)?)
170 .execute(dbtx.as_mut())
171 .await?;
172 Ok(())
173 }
174
175 pub async fn set_rewards_for_epoch(
176 dbtx: &mut PgTransaction<'_>,
177 epoch: u64,
178 height: u64,
179 amount: Amount,
180 ) -> anyhow::Result<()> {
181 sqlx::query("UPDATE lqt._epoch_info SET available_rewards = $2::NUMERIC, updated_block = $3 WHERE epoch = $1")
182 .bind(i64::try_from(epoch)?)
183 .bind(BigDecimal::from(amount.value()))
184 .bind(i64::try_from(height)?)
185 .execute(dbtx.as_mut())
186 .await?;
187 Ok(())
188 }
189
190 pub async fn current(dbtx: &mut PgTransaction<'_>) -> anyhow::Result<u64> {
191 let out: i32 =
192 sqlx::query_scalar("SELECT epoch FROM lqt._epoch_info ORDER BY epoch DESC LIMIT 1")
193 .fetch_one(dbtx.as_mut())
194 .await?;
195 Ok(u64::try_from(out)?)
196 }
197}
198
199mod _finished_epochs {
200 use super::*;
201
202 pub async fn declare_finished(dbtx: &mut PgTransaction<'_>, epoch: u64) -> anyhow::Result<()> {
206 sqlx::query("INSERT INTO lqt._finished_epochs VALUES ($1) ON CONFLICT DO NOTHING")
207 .bind(i64::try_from(epoch)?)
208 .execute(dbtx.as_mut())
209 .await?;
210 Ok(())
211 }
212}
213
214mod _delegator_rewards {
215 use super::*;
216
217 pub async fn add(
219 dbtx: &mut PgTransaction<'_>,
220 epoch: u64,
221 addr: Address,
222 amount: Amount,
223 ) -> anyhow::Result<()> {
224 sqlx::query(
225 "
226 INSERT INTO lqt._delegator_rewards
227 VALUES ($1, $2, $3)
228 ON CONFLICT (epoch, address)
229 DO UPDATE SET
230 amount = lqt._delegator_rewards.amount + EXCLUDED.amount
231 ",
232 )
233 .bind(i64::try_from(epoch)?)
234 .bind(addr.to_vec())
235 .bind(BigDecimal::from(amount.value()))
236 .execute(dbtx.as_mut())
237 .await?;
238 Ok(())
239 }
240}
241
242mod _lp_rewards {
243 use super::*;
244
245 pub async fn add_reward(
249 dbtx: &mut PgTransaction<'_>,
250 epoch: u64,
251 position_id: position::Id,
252 amount: Amount,
253 ) -> anyhow::Result<()> {
254 sqlx::query(
255 "
256 UPDATE lqt._lp_rewards
257 SET amount = amount + $3
258 WHERE epoch = $1 AND position_id = $2
259 ",
260 )
261 .bind(i64::try_from(epoch)?)
262 .bind(position_id.0)
263 .bind(BigDecimal::from(amount.value()))
264 .execute(dbtx.as_mut())
265 .await?;
266 Ok(())
267 }
268
269 pub async fn add_flows(
271 dbtx: &mut PgTransaction<'_>,
272 epoch: u64,
273 position_id: position::Id,
274 other_asset: asset::Id,
275 um_volume: Amount,
276 asset_volume: Amount,
277 um_fees: Amount,
278 asset_fees: Amount,
279 points: Amount,
280 ) -> anyhow::Result<()> {
281 sqlx::query(
282 "
283 INSERT INTO lqt._lp_rewards
284 VALUES ($1, $2, $3, 0, 1, $4, $5, $6, $7, $8)
285 ON CONFLICT (epoch, position_id)
286 DO UPDATE SET
287 asset_id = EXCLUDED.asset_id,
288 executions = lqt._lp_rewards.executions + 1,
289 um_volume = lqt._lp_rewards.um_volume + EXCLUDED.um_volume,
290 asset_volume = lqt._lp_rewards.asset_volume + EXCLUDED.asset_volume,
291 um_fees = lqt._lp_rewards.um_fees + EXCLUDED.um_fees,
292 asset_fees = lqt._lp_rewards.asset_fees + EXCLUDED.asset_fees,
293 points = lqt._lp_rewards.points + EXCLUDED.points
294 ",
295 )
296 .bind(i64::try_from(epoch)?)
297 .bind(position_id.0)
298 .bind(other_asset.to_bytes())
299 .bind(BigDecimal::from(um_volume.value()))
300 .bind(BigDecimal::from(asset_volume.value()))
301 .bind(BigDecimal::from(um_fees.value()))
302 .bind(BigDecimal::from(asset_fees.value()))
303 .bind(BigDecimal::from(points.value()))
304 .execute(dbtx.as_mut())
305 .await?;
306 Ok(())
307 }
308}
309
310mod _votes {
311 use super::*;
312
313 pub async fn add(
315 dbtx: &mut PgTransaction<'_>,
316 epoch: u64,
317 power: Amount,
318 asset_id: asset::Id,
319 addr: Address,
320 ) -> anyhow::Result<()> {
321 sqlx::query(
322 "
323 INSERT INTO lqt._votes
324 VALUES (DEFAULT, $1, $2, $3, $4)
325 ",
326 )
327 .bind(i64::try_from(epoch)?)
328 .bind(BigDecimal::from(power.value()))
329 .bind(asset_id.to_bytes())
330 .bind(addr.to_vec())
331 .execute(dbtx.as_mut())
332 .await?;
333 Ok(())
334 }
335}
336
337#[derive(Debug)]
338pub struct Lqt {}
339
340impl Lqt {
341 async fn index_event(
342 &self,
343 dbtx: &mut PgTransaction<'_>,
344 event: ContextualizedEvent<'_>,
345 ) -> anyhow::Result<()> {
346 if let Ok(e) = EventLqtVote::try_from_event(&event.event) {
347 _votes::add(
348 dbtx,
349 e.epoch_index,
350 e.voting_power,
351 e.incentivized_asset_id,
352 e.rewards_recipient,
353 )
354 .await?;
355 } else if let Ok(e) = EventLqtDelegatorReward::try_from_event(&event.event) {
356 _delegator_rewards::add(dbtx, e.epoch_index, e.address, e.reward_amount).await?;
357 } else if let Ok(e) = EventLqtPositionReward::try_from_event(&event.event) {
358 _lp_rewards::add_reward(dbtx, e.epoch_index, e.position_id, e.reward_amount).await?;
359 } else if let Ok(e) = EventLqtPositionVolume::try_from_event(&event.event) {
360 _lp_rewards::add_flows(
361 dbtx,
362 e.epoch_index,
363 e.position_id,
364 e.asset_id,
365 e.staking_token_in,
366 e.asset_in,
367 e.staking_fees,
368 e.asset_fees,
369 e.volume,
370 )
371 .await?;
372 } else if let Ok(e) = EventEpochRoot::try_from_event(&event.event) {
373 _finished_epochs::declare_finished(dbtx, e.index).await?;
374 _epoch_info::end_epoch(dbtx, e.index, event.block_height).await?;
375 _epoch_info::start_epoch(dbtx, e.index + 1, event.block_height + 1).await?;
376 } else if let Ok(e) = EventLqtPoolSizeIncrease::try_from_event(&event.event) {
377 _epoch_info::set_rewards_for_epoch(
378 dbtx,
379 e.epoch_index,
380 event.block_height,
381 e.new_total,
382 )
383 .await?;
384 } else if let Ok(e) = EventAppParametersChange::try_from_event(&event.event) {
385 let current = _epoch_info::current(dbtx).await?;
386 _params::set_epoch(dbtx, current, e.new_parameters.into()).await?;
387 }
388 Ok(())
389 }
390}
391
392#[async_trait]
393impl AppView for Lqt {
394 async fn init_chain(
395 &self,
396 dbtx: &mut PgTransaction,
397 app_state: &serde_json::Value,
398 ) -> Result<(), anyhow::Error> {
399 let content = parse_content(app_state.clone())?;
400 _params::set_initial(dbtx, content.into()).await?;
401 Ok(())
402 }
403
404 fn name(&self) -> String {
405 "lqt".to_string()
406 }
407
408 fn version(&self) -> Version {
409 Version::with_major(4)
410 }
411
412 async fn reset(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
413 for statement in include_str!("reset.sql").split(";") {
414 sqlx::query(statement).execute(dbtx.as_mut()).await?;
415 }
416 Ok(())
417 }
418
419 async fn on_startup(&self, dbtx: &mut PgTransaction) -> Result<(), anyhow::Error> {
420 for statement in include_str!("schema.sql").split(";") {
421 sqlx::query(statement).execute(dbtx.as_mut()).await?;
422 }
423 _epoch_info::start_epoch(dbtx, 1, 1).await?;
424 Ok(())
425 }
426
427 async fn index_batch(
428 &self,
429 dbtx: &mut PgTransaction,
430 batch: EventBatch,
431 _ctx: EventBatchContext,
432 ) -> Result<(), anyhow::Error> {
433 _meta::update_with_batch(dbtx, &batch).await?;
434 for event in batch.events() {
435 self.index_event(dbtx, event).await?;
436 }
437 Ok(())
438 }
439}